In [21]:
sc

<pyspark.context.SparkContext at 0x11244c110>

In [152]:
import csv
from StringIO import StringIO
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
from datetime import datetime
from pyspark.sql.types import TimestampType
sqlContext = SQLContext(sc)

In [153]:
def split(line):
    """
    Operator function for splitting a line with csv module
    """
    reader = csv.reader(StringIO(line))
    return reader.next()



In [155]:
# Chicago Crime dataset 
#Note: 1. bug in h2o-parse(PUBDEV-2213), converts alphanumeric to missing values in columns IUCR and FBI_Code
#      2. converted district to string bec of missing value
data = sc.textFile("/Users/nidhimehta/sparkling-water/examples/smalldata/chicagoCrimes10k.csv")
data.first()
#remove header
header = data.take(1)[0]
rows = data.filter(lambda line: line != header)
rows.first()
rows = rows.map(split)

aa = rows.map(lambda c: Row(date=(c[2]),IUCR = c[4],Primary_Type=c[5],Location_Description=c[7],Arrest=c[8],\
                                  Domestic = c[9],Beat = int(c[10]),District = (c[11]),Ward = int(c[12]),\
                                  Community_Area = int(c[13]),FBI_Code =c[14]))
chicagoCrime = sqlContext.createDataFrame(aa)

# Create Additional Columns from datetime
#Day Month Year HourofDay WeekNum
def parse_dt():
    def _parse(dt):
        return datetime.strptime(dt, '%m/%d/%Y %I:%M:%S %p')
    return udf(_parse, TimestampType())

chicagoCrime = chicagoCrime.withColumn("timestamp", parse_dt()(chicagoCrime.date))
chicagoCrime = chicagoCrime.withColumn("Day", dayofmonth(chicagoCrime.timestamp))
chicagoCrime = chicagoCrime.withColumn("Month", month(chicagoCrime.timestamp))
chicagoCrime = chicagoCrime.withColumn("Year", year(chicagoCrime.timestamp))
chicagoCrime = chicagoCrime.withColumn("HourOfDay", hour(chicagoCrime.timestamp))
chicagoCrime = chicagoCrime.withColumn("WeekNum", weekofyear(chicagoCrime.timestamp))

#Seasons
from pyspark.sql.types import IntegerType
def season_dt():
    def _season(dt):
        if (dt >= 3 and dt <= 5):   #Spring
            return 0
        elif (dt >= 6 and dt <= 8): #Summer
            return 1
        elif (dt >= 9 and dt <= 10):#Autumn
            return 2
        else:                       #Winter
            return 3
    return udf(_season, IntegerType())
chicagoCrime = chicagoCrime.withColumn("Season", season_dt()(chicagoCrime.Month))

#Day Of Week
from pyspark.sql.types import StringType
def reparse_dt():
    def _reparse(dt):
        return datetime.strftime(dt, '%m/%d/%Y')
    return udf(_reparse, StringType())

chicagoCrime = chicagoCrime.withColumn("date", reparse_dt()(chicagoCrime.timestamp))
chicagoCrime = chicagoCrime.withColumn("dow",from_unixtime(unix_timestamp(chicagoCrime.date,"MM/dd/yyyy"), "EEEEE"))

#Weekend
def is_weekend():
    def _is_weekend(dt):
        if (dt == 'Sunday' or dt == 'Saturday'):  #weekend
            return 1
        else:                       
            return 0                              #no weekend
    return udf(_is_weekend, IntegerType())
chicagoCrime = chicagoCrime.withColumn("Weekend", is_weekend()(chicagoCrime.dow))

#Weekday
def weekday():
    def _weekday(dt):
        if (dt == 'Sunday' ):  
            return 7
        elif (dt == 'Saturday'): 
            return 6
        elif (dt == 'Friday'):
            return 5
        elif (dt == 'Thursday'):
            return 4
        elif (dt == 'Wednesday'):
            return 3
        elif (dt == 'Tuesday'):
            return 2
        else:                       
            return 1                             
    return udf(_weekday, IntegerType())

chicagoCrime = chicagoCrime.withColumn("WeekDay", weekday()(chicagoCrime.dow))

print(chicagoCrime.take(2))

sqlContext.registerDataFrameAsTable(chicagoCrime, "chicagoCrime")
aa = sqlContext.sql("SELECT * FROM chicagoCrime WHERE Arrest = 'true'")
aa.count() #2928


[Row(Arrest=u'true', Beat=422, Community_Area=46, District=u'004', Domestic=u'false', FBI_Code=u'18', IUCR=u'1811', Location_Description=u'STREET', Primary_Type=u'NARCOTICS', Ward=7, date=u'02/08/2015', timestamp=datetime.datetime(2015, 2, 8, 23, 43, 40), Day=8, Month=2, Year=2015, HourOfDay=23, WeekNum=6, Season=3, dow=u'Sunday', Weekend=1, WeekDay=7), Row(Arrest=u'true', Beat=522, Community_Area=53, District=u'005', Domestic=u'true', FBI_Code=u'08B', IUCR=u'0486', Location_Description=u'APARTMENT', Primary_Type=u'BATTERY', Ward=34, date=u'02/08/2015', timestamp=datetime.datetime(2015, 2, 8, 23, 41, 42), Day=8, Month=2, Year=2015, HourOfDay=23, WeekNum=6, Season=3, dow=u'Sunday', Weekend=1, WeekDay=7)]


2928

In [156]:
# Chicago Census Data 
# Note: removed 78th (last) line from the datafile; otherwise cast was not working (bec of missing values) and was getting error
data = sc.textFile("/Users/nidhimehta/sparkling-water/examples/smalldata/chicagoCensus.csv")
data.first()
#remove header
header = data.take(1)[0]
rows = data.filter(lambda line: line != header)
rows.first()
rows = rows.map(split)
print(rows.take(2))
bb = rows.map(lambda c: Row(Community_Area_Number= int(c[0]),PERCENT_OF_HOUSING_CROWDED=float(c[2]),PERCENT_HOUSEHOLDS_BELOW_POVERTY= float(c[3]),\
                                  PERCENT_AGED_16__UNEMPLOYED = float(c[4]),PERCENT_AGED_25__WITHOUT_HIGH_SCHOOL_DIPLOMA = float(c[5]),\
                                  PERCENT_AGED_UNDER_18_OR_OVER_64 = float(c[6]),PER_CAPITA_INCOME=int(c[7]),HARDSHIP_INDEX=int(c[8])))
bb.take(1)

chicagoCensus = sqlCtx.createDataFrame(bb)
print(chicagoCensus.take(1))
sqlContext.registerDataFrameAsTable(chicagoCensus, "chicagoCensus")
aa = sqlContext.sql("SELECT * FROM chicagoCensus")
#aa.take(16)
aa.count() 

[['1', 'Rogers Park', '7.7', '23.6', '8.7', '18.2', '27.5', '23939', '39'], ['2', 'West Ridge', '7.8', '17.2', '8.8', '20.8', '38.5', '23040', '46']]
[Row(Community_Area_Number=1, HARDSHIP_INDEX=39, PERCENT_AGED_16__UNEMPLOYED=8.7, PERCENT_AGED_25__WITHOUT_HIGH_SCHOOL_DIPLOMA=18.2, PERCENT_AGED_UNDER_18_OR_OVER_64=27.5, PERCENT_HOUSEHOLDS_BELOW_POVERTY=23.6, PERCENT_OF_HOUSING_CROWDED=7.7, PER_CAPITA_INCOME=23939)]


77

In [158]:
# Chicago Weather Data
data = sc.textFile("/Users/nidhimehta/sparkling-water/examples/smalldata/chicagoAllWeather.csv")
#data = sc.textFile("/Users/nidhimehta/Desktop/missingvalue_withna.csv")

data.first()
#remove header
header = data.take(1)[0]
rows = data.filter(lambda line: line != header)
rows.first()
rows = rows.map(split)
rows.take(2)
# empty string to int: because  missing values are present in  maxTemp, meanTemp and minTemp, cannot cast them to int
cc = rows.map(lambda c: Row(month = int(c[1]),day =int(c[2]) ,year= int(c[3]),\
                                  maxTemp=(c[4]),meanTemp =(c[5]),minTemp =(c[6]) ))
cc.take(5)

chicagoWeather = sqlCtx.createDataFrame(cc)

#missing values 
print(chicagoWeather.filter(chicagoWeather.minTemp == '').count())
print(chicagoWeather.filter(chicagoWeather.meanTemp == '').count())
print(chicagoWeather.filter(chicagoWeather.maxTemp == '').count())
#toint = udf(lambda s: s, IntegerType())
#chicagoWeather = chicagoWeather.withColumn("num",toint(chicagoWeather.maxTemp))
#print(chicagoWeather.printSchema)


def prettySummary(df):
    import pandas as pd
    temp = df.describe().toPandas()
    temp.iloc[1:5,1:] = temp.iloc[1:5,1:].convert_objects(convert_numeric=True)
    pd.options.display.float_format = '{:,.2f}'.format
    return temp

print(prettySummary(chicagoWeather))
#chicagoWeather.dtypes
#chicagoWeather.convert_objects(convert_numeric=True)
#chicagoWeather.convert_objects(convert_numeric=True).dtypes
#chicagoWeather.convert_objects(convert_numeric=True)
#print(chicagoWeather.take(2))
#chicagoWeather = chicagoWeather.dropna()
sqlContext.registerDataFrameAsTable(chicagoWeather, "chicagoWeather")
aa = sqlContext.sql("SELECT * FROM chicagoWeather")
aa.count()


13
13
13
  summary   day month     year
0   count  5162  5162     5162
1    mean 15.71  6.47 2,007.57
2  stddev  8.80  3.47     4.08
3     min  1.00  1.00 2,001.00
4     max 31.00 12.00 2,015.00


5162

In [159]:
crimeWeather = sqlContext.sql(
    "SELECT \
    a.Year, a.Month, a.Day, a.HourOfDay, a.Weekend, a.Season, a.WeekDay, a.WeekNum,\
    a.IUCR, a.Primary_Type, a.Location_Description, a.Community_Area, a.District,\
    a.Arrest, a.Domestic, a.Beat, a.Ward, a.FBI_Code,\
    b.minTemp, b.maxTemp, b.meanTemp,\
    c.PERCENT_AGED_UNDER_18_OR_OVER_64, c.PER_CAPITA_INCOME, c.HARDSHIP_INDEX,\
    c.PERCENT_OF_HOUSING_CROWDED, c.PERCENT_HOUSEHOLDS_BELOW_POVERTY,\
    c.PERCENT_AGED_16__UNEMPLOYED, c.PERCENT_AGED_25__WITHOUT_HIGH_SCHOOL_DIPLOMA\
    FROM chicagoCrime a\
    JOIN chicagoWeather b\
    ON a.Year = b.year AND a.Month = b.month AND a.Day = b.day\
    JOIN chicagoCensus c\
    ON a.Community_Area = c.Community_Area_Number")


In [161]:
crimeWeather.first()

Row(Year=2015, Month=1, Day=23, HourOfDay=22, Weekend=0, Season=3, WeekDay=5, WeekNum=4, IUCR=u'143A', Primary_Type=u'WEAPONS VIOLATION', Location_Description=u'ALLEY', Community_Area=31, District=u'012', Arrest=u'true', Domestic=u'false', Beat=1234, Ward=25, FBI_Code=u'15', minTemp=u'29', maxTemp=u'31', meanTemp=u'30', PERCENT_AGED_UNDER_18_OR_OVER_64=32.6, PER_CAPITA_INCOME=16444, HARDSHIP_INDEX=76, PERCENT_OF_HOUSING_CROWDED=9.6, PERCENT_HOUSEHOLDS_BELOW_POVERTY=25.8, PERCENT_AGED_16__UNEMPLOYED=15.8, PERCENT_AGED_25__WITHOUT_HIGH_SCHOOL_DIPLOMA=40.7)

In [162]:
crimeWeather.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- HourOfDay: integer (nullable = true)
 |-- Weekend: integer (nullable = true)
 |-- Season: integer (nullable = true)
 |-- WeekDay: integer (nullable = true)
 |-- WeekNum: integer (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary_Type: string (nullable = true)
 |-- Location_Description: string (nullable = true)
 |-- Community_Area: long (nullable = true)
 |-- District: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: long (nullable = true)
 |-- Ward: long (nullable = true)
 |-- FBI_Code: string (nullable = true)
 |-- minTemp: string (nullable = true)
 |-- maxTemp: string (nullable = true)
 |-- meanTemp: string (nullable = true)
 |-- PERCENT_AGED_UNDER_18_OR_OVER_64: double (nullable = true)
 |-- PER_CAPITA_INCOME: long (nullable = true)
 |-- HARDSHIP_INDEX: long (nullable = true)
 |-- PER

In [None]:
#toint = udf(lambda s: s, IntegerType())
#chicagoWeather = chicagoWeather.withColumn("num",toint(chicagoWeather.maxTemp))

#def toInt():
#    def _toInt(dt):
#        return dt
#    return udf(_toInt, IntegerType())

#chicagoWeather = chicagoWeather.withColumn("MaxT", toInt()(chicagoWeather.maxTemp))

