In [1]:
import pyspark as ps
from pyspark.sql import SQLContext
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import CountVectorizer, Tokenizer
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import mean_squared_log_error
from sklearn.linear_model import SGDRegressor
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import cross_val_score
import time

spark = (
        ps.sql.SparkSession.builder 
        .master("justin") 
        .appName("chicago") 
        .getOrCreate()
        )

sc = spark.sparkContext

# random.seed(1)

# def sample(p):
#     x, y = random.random(), random.random()
#     return 1 if x*x + y*y < 1 else 0

# count = spark.sparkContext.parallelize(range(0, 10000000)).map(sample) \
#              .reduce(lambda a, b: a + b)

# print("Pi is (very) roughly {}".format(4.0 * count / 10000000))

In [5]:
import pandas as pd
import pymysql


In [3]:
df_sp = spark.read.csv('/home/justin/Downloads/bq-results-20190515-182310-9jmw514pxnfg.csv', header='true',
                       inferSchema='true')

In [4]:
df_pd = df_sp.select('unique_key','arrest','block','community_area','district','latitude','longitude').toPandas()

In [5]:
train = df_pd[df_pd.longitude.isnull()==False]
test = df_pd[df_pd.longitude.isnull()==True]

In [6]:
X_train = train[train.columns.difference(['arrest','longitude'])].block.values
y_train = train['longitude']

In [7]:
vect = TfidfVectorizer()
X_train_vect=vect.fit_transform(X_train)

In [8]:
model = SGDRegressor(loss="squared_loss", penalty='l2', random_state=101, max_iter=5)
params= {'penalty':['none','l2','l1'],
        'alpha':[5e-4,5e-2, .1]}
gs=GridSearchCV(estimator=model,
               param_grid=params,
               scoring='neg_mean_squared_error',
               n_jobs=1,
               cv=5,
               verbose=3)
start=time.time()
gs.fit(X_train_vect, y_train)
end=time.time()
print("Time it took {}".format(end-start))

Fitting 5 folds for each of 9 candidates, totalling 45 fits
[CV] alpha=0.0005, penalty=none ......................................


[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.


[CV]  alpha=0.0005, penalty=none, score=-32.167207809182436, total=   2.8s
[CV] alpha=0.0005, penalty=none ......................................


[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:    2.8s remaining:    0.0s


[CV]  alpha=0.0005, penalty=none, score=-13.481711735477946, total=   2.4s
[CV] alpha=0.0005, penalty=none ......................................


[Parallel(n_jobs=1)]: Done   2 out of   2 | elapsed:    5.3s remaining:    0.0s


[CV]  alpha=0.0005, penalty=none, score=-10.101654106961654, total=   2.4s
[CV] alpha=0.0005, penalty=none ......................................




[CV]  alpha=0.0005, penalty=none, score=-8.201825123968378, total=   2.5s
[CV] alpha=0.0005, penalty=none ......................................




[CV]  alpha=0.0005, penalty=none, score=-10.27146933032898, total=   2.5s
[CV] alpha=0.0005, penalty=l2 ........................................




[CV]  alpha=0.0005, penalty=l2, score=-23.11658016309282, total=   2.5s
[CV] alpha=0.0005, penalty=l2 ........................................




[CV]  alpha=0.0005, penalty=l2, score=-9.543591191543264, total=   2.5s
[CV] alpha=0.0005, penalty=l2 ........................................




[CV]  alpha=0.0005, penalty=l2, score=-6.7123857606828965, total=   2.7s
[CV] alpha=0.0005, penalty=l2 ........................................




[CV]  alpha=0.0005, penalty=l2, score=-5.3860234074856805, total=   2.5s
[CV] alpha=0.0005, penalty=l2 ........................................




[CV]  alpha=0.0005, penalty=l2, score=-7.903991224251152, total=   2.4s
[CV] alpha=0.0005, penalty=l1 ........................................




[CV]  alpha=0.0005, penalty=l1, score=-33.71867745398241, total=   2.5s
[CV] alpha=0.0005, penalty=l1 ........................................




[CV]  alpha=0.0005, penalty=l1, score=-14.344072842104014, total=   2.6s
[CV] alpha=0.0005, penalty=l1 ........................................




[CV]  alpha=0.0005, penalty=l1, score=-10.705373032447117, total=   2.6s
[CV] alpha=0.0005, penalty=l1 ........................................




[CV]  alpha=0.0005, penalty=l1, score=-8.60032156005457, total=   2.6s
[CV] alpha=0.0005, penalty=l1 ........................................




[CV]  alpha=0.0005, penalty=l1, score=-10.661111918890528, total=   2.6s
[CV] alpha=0.05, penalty=none ........................................




[CV]  alpha=0.05, penalty=none, score=-32.167207809182436, total=   2.4s
[CV] alpha=0.05, penalty=none ........................................




[CV]  alpha=0.05, penalty=none, score=-13.481711735477946, total=   2.4s
[CV] alpha=0.05, penalty=none ........................................




[CV]  alpha=0.05, penalty=none, score=-10.101654106961654, total=   2.4s
[CV] alpha=0.05, penalty=none ........................................




[CV]  alpha=0.05, penalty=none, score=-8.201825123968378, total=   2.4s
[CV] alpha=0.05, penalty=none ........................................




[CV]  alpha=0.05, penalty=none, score=-10.27146933032898, total=   2.6s
[CV] alpha=0.05, penalty=l2 ..........................................




[CV]  alpha=0.05, penalty=l2, score=-0.004358911567627504, total=   2.7s
[CV] alpha=0.05, penalty=l2 ..........................................




[CV]  alpha=0.05, penalty=l2, score=-0.004642637044258677, total=   2.8s
[CV] alpha=0.05, penalty=l2 ..........................................




[CV]  alpha=0.05, penalty=l2, score=-0.004106947405036811, total=   2.7s
[CV] alpha=0.05, penalty=l2 ..........................................




[CV]  alpha=0.05, penalty=l2, score=-0.0035433966842109395, total=   2.7s
[CV] alpha=0.05, penalty=l2 ..........................................




[CV]  alpha=0.05, penalty=l2, score=-0.004464780902546208, total=   2.7s
[CV] alpha=0.05, penalty=l1 ..........................................




[CV] .. alpha=0.05, penalty=l1, score=-43.5282992347541, total=   2.7s
[CV] alpha=0.05, penalty=l1 ..........................................




[CV] . alpha=0.05, penalty=l1, score=-22.43183357697792, total=   2.7s
[CV] alpha=0.05, penalty=l1 ..........................................




[CV] .. alpha=0.05, penalty=l1, score=-18.1698851514693, total=   2.7s
[CV] alpha=0.05, penalty=l1 ..........................................




[CV]  alpha=0.05, penalty=l1, score=-13.988038106474546, total=   2.8s
[CV] alpha=0.05, penalty=l1 ..........................................




[CV]  alpha=0.05, penalty=l1, score=-11.776647780220673, total=   2.7s
[CV] alpha=0.1, penalty=none .........................................




[CV]  alpha=0.1, penalty=none, score=-32.167207809182436, total=   2.4s
[CV] alpha=0.1, penalty=none .........................................




[CV]  alpha=0.1, penalty=none, score=-13.481711735477946, total=   2.4s
[CV] alpha=0.1, penalty=none .........................................




[CV]  alpha=0.1, penalty=none, score=-10.101654106961654, total=   2.8s
[CV] alpha=0.1, penalty=none .........................................




[CV]  alpha=0.1, penalty=none, score=-8.201825123968378, total=   2.6s
[CV] alpha=0.1, penalty=none .........................................




[CV]  alpha=0.1, penalty=none, score=-10.27146933032898, total=   2.9s
[CV] alpha=0.1, penalty=l2 ...........................................




[CV]  alpha=0.1, penalty=l2, score=-0.0044514925662275575, total=   2.9s
[CV] alpha=0.1, penalty=l2 ...........................................




[CV]  alpha=0.1, penalty=l2, score=-0.0047616822964582015, total=   2.8s
[CV] alpha=0.1, penalty=l2 ...........................................




[CV]  alpha=0.1, penalty=l2, score=-0.004221581448020065, total=   2.7s
[CV] alpha=0.1, penalty=l2 ...........................................




[CV]  alpha=0.1, penalty=l2, score=-0.0036517433257068324, total=   2.7s
[CV] alpha=0.1, penalty=l2 ...........................................




[CV]  alpha=0.1, penalty=l2, score=-0.004572014222590861, total=   2.7s
[CV] alpha=0.1, penalty=l1 ...........................................




[CV] .. alpha=0.1, penalty=l1, score=-28.42529287455159, total=   2.7s
[CV] alpha=0.1, penalty=l1 ...........................................




[CV] .. alpha=0.1, penalty=l1, score=-17.45309182831349, total=   2.8s
[CV] alpha=0.1, penalty=l1 ...........................................




[CV] . alpha=0.1, penalty=l1, score=-13.701312596611986, total=   3.0s
[CV] alpha=0.1, penalty=l1 ...........................................




[CV] . alpha=0.1, penalty=l1, score=-10.684420271524422, total=   2.9s
[CV] alpha=0.1, penalty=l1 ...........................................




[CV] .. alpha=0.1, penalty=l1, score=-9.833860093453168, total=   2.9s


[Parallel(n_jobs=1)]: Done  45 out of  45 | elapsed:  2.0min finished


Time it took 123.41386270523071


In [9]:
model = gs.best_estimator_
pipe = Pipeline([('vect',vect),('model',model)])
y_pred = pipe.predict(test.block.values)

In [12]:
for idx, val in enumerate(test.index):
    df_pd.iloc[val,5] = y_pred[idx] 

In [13]:
train_lat = df_pd[df_pd.latitude.isnull()==False]
test_lat = df_pd[df_pd.latitude.isnull()==True]

In [14]:
X_train_lat = train_lat[train_lat.columns.difference(['arrest','latitude'])].block.values
y_train_lat = train_lat['latitude']

In [15]:
vect_lat = TfidfVectorizer()
X_train_vect_lat=vect_lat.fit_transform(X_train_lat)

In [16]:
model = SGDRegressor(loss="squared_loss", penalty='l2', random_state=101, max_iter=5)
params= {'penalty':['none','l2','l1'],
        'alpha':[5e-4,5e-2, .1]}
gs=GridSearchCV(estimator=model,
               param_grid=params,
               scoring='neg_mean_squared_error',
               n_jobs=1,
               cv=5,
               verbose=3)
start=time.time()
gs.fit(X_train_vect_lat, y_train_lat)
end=time.time()
print("Time it took {}".format(end-start))

Fitting 5 folds for each of 9 candidates, totalling 45 fits
[CV] alpha=0.0005, penalty=none ......................................


[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.


[CV]  alpha=0.0005, penalty=none, score=-7.427539414606433, total=   2.5s
[CV] alpha=0.0005, penalty=none ......................................


[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:    2.6s remaining:    0.0s


[CV]  alpha=0.0005, penalty=none, score=-3.0938211888765483, total=   2.4s
[CV] alpha=0.0005, penalty=none ......................................


[Parallel(n_jobs=1)]: Done   2 out of   2 | elapsed:    5.0s remaining:    0.0s


[CV]  alpha=0.0005, penalty=none, score=-2.296869684308005, total=   2.5s
[CV] alpha=0.0005, penalty=none ......................................




[CV]  alpha=0.0005, penalty=none, score=-1.8567781708745184, total=   2.4s
[CV] alpha=0.0005, penalty=none ......................................




[CV]  alpha=0.0005, penalty=none, score=-2.3050106305762306, total=   2.4s
[CV] alpha=0.0005, penalty=l2 ........................................




[CV]  alpha=0.0005, penalty=l2, score=-5.379678529063455, total=   2.5s
[CV] alpha=0.0005, penalty=l2 ........................................




[CV]  alpha=0.0005, penalty=l2, score=-2.207117914995905, total=   2.5s
[CV] alpha=0.0005, penalty=l2 ........................................




[CV]  alpha=0.0005, penalty=l2, score=-1.5186900148816005, total=   2.5s
[CV] alpha=0.0005, penalty=l2 ........................................




[CV]  alpha=0.0005, penalty=l2, score=-1.2066563986259513, total=   2.5s
[CV] alpha=0.0005, penalty=l2 ........................................




[CV]  alpha=0.0005, penalty=l2, score=-1.7417584963399406, total=   2.5s
[CV] alpha=0.0005, penalty=l1 ........................................




[CV]  alpha=0.0005, penalty=l1, score=-8.096118377989951, total=   2.7s
[CV] alpha=0.0005, penalty=l1 ........................................




[CV]  alpha=0.0005, penalty=l1, score=-3.5045117044802336, total=   2.6s
[CV] alpha=0.0005, penalty=l1 ........................................




[CV]  alpha=0.0005, penalty=l1, score=-2.590019274784618, total=   2.6s
[CV] alpha=0.0005, penalty=l1 ........................................




[CV]  alpha=0.0005, penalty=l1, score=-2.05828144672879, total=   2.6s
[CV] alpha=0.0005, penalty=l1 ........................................




[CV]  alpha=0.0005, penalty=l1, score=-2.4906577473131786, total=   2.6s
[CV] alpha=0.05, penalty=none ........................................




[CV]  alpha=0.05, penalty=none, score=-7.427539414606433, total=   2.4s
[CV] alpha=0.05, penalty=none ........................................




[CV]  alpha=0.05, penalty=none, score=-3.0938211888765483, total=   2.5s
[CV] alpha=0.05, penalty=none ........................................




[CV]  alpha=0.05, penalty=none, score=-2.296869684308005, total=   2.5s
[CV] alpha=0.05, penalty=none ........................................




[CV]  alpha=0.05, penalty=none, score=-1.8567781708745184, total=   2.5s
[CV] alpha=0.05, penalty=none ........................................




[CV]  alpha=0.05, penalty=none, score=-2.3050106305762306, total=   2.5s
[CV] alpha=0.05, penalty=l2 ..........................................




[CV]  alpha=0.05, penalty=l2, score=-0.015211161664245734, total=   2.5s
[CV] alpha=0.05, penalty=l2 ..........................................




[CV]  alpha=0.05, penalty=l2, score=-0.009605889959083312, total=   2.7s
[CV] alpha=0.05, penalty=l2 ..........................................




[CV]  alpha=0.05, penalty=l2, score=-0.004508851940491529, total=   2.7s
[CV] alpha=0.05, penalty=l2 ..........................................




[CV]  alpha=0.05, penalty=l2, score=-0.0069987930801819346, total=   2.7s
[CV] alpha=0.05, penalty=l2 ..........................................




[CV]  alpha=0.05, penalty=l2, score=-0.013178492311095169, total=   2.7s
[CV] alpha=0.05, penalty=l1 ..........................................




[CV] . alpha=0.05, penalty=l1, score=-6.252976193924272, total=   3.0s
[CV] alpha=0.05, penalty=l1 ..........................................




[CV] . alpha=0.05, penalty=l1, score=-3.834758856232307, total=   3.0s
[CV] alpha=0.05, penalty=l1 ..........................................




[CV]  alpha=0.05, penalty=l1, score=-3.0291979012193426, total=   2.8s
[CV] alpha=0.05, penalty=l1 ..........................................




[CV]  alpha=0.05, penalty=l1, score=-2.3412110501559065, total=   2.8s
[CV] alpha=0.05, penalty=l1 ..........................................




[CV]  alpha=0.05, penalty=l1, score=-2.1170639660660697, total=   2.8s
[CV] alpha=0.1, penalty=none .........................................




[CV]  alpha=0.1, penalty=none, score=-7.427539414606433, total=   2.7s
[CV] alpha=0.1, penalty=none .........................................




[CV]  alpha=0.1, penalty=none, score=-3.0938211888765483, total=   2.7s
[CV] alpha=0.1, penalty=none .........................................




[CV]  alpha=0.1, penalty=none, score=-2.296869684308005, total=   2.7s
[CV] alpha=0.1, penalty=none .........................................




[CV]  alpha=0.1, penalty=none, score=-1.8567781708745184, total=   2.7s
[CV] alpha=0.1, penalty=none .........................................




[CV]  alpha=0.1, penalty=none, score=-2.3050106305762306, total=   2.7s
[CV] alpha=0.1, penalty=l2 ...........................................




[CV]  alpha=0.1, penalty=l2, score=-0.015476774448514759, total=   2.7s
[CV] alpha=0.1, penalty=l2 ...........................................




[CV]  alpha=0.1, penalty=l2, score=-0.009867378476565233, total=   2.7s
[CV] alpha=0.1, penalty=l2 ...........................................




[CV]  alpha=0.1, penalty=l2, score=-0.004630585162670053, total=   2.7s
[CV] alpha=0.1, penalty=l2 ...........................................




[CV]  alpha=0.1, penalty=l2, score=-0.007237129166065111, total=   2.7s
[CV] alpha=0.1, penalty=l2 ...........................................




[CV]  alpha=0.1, penalty=l2, score=-0.01356167882328304, total=   2.7s
[CV] alpha=0.1, penalty=l1 ...........................................




[CV] . alpha=0.1, penalty=l1, score=-0.8995135414392518, total=   2.8s
[CV] alpha=0.1, penalty=l1 ...........................................




[CV] . alpha=0.1, penalty=l1, score=-0.6946343660727693, total=   2.8s
[CV] alpha=0.1, penalty=l1 ...........................................




[CV] . alpha=0.1, penalty=l1, score=-0.6068782234205196, total=   2.8s
[CV] alpha=0.1, penalty=l1 ...........................................




[CV] . alpha=0.1, penalty=l1, score=-0.5364135631959327, total=   2.8s
[CV] alpha=0.1, penalty=l1 ...........................................




[CV] . alpha=0.1, penalty=l1, score=-0.4528837360798714, total=   2.8s


[Parallel(n_jobs=1)]: Done  45 out of  45 | elapsed:  2.0min finished


Time it took 123.58108019828796


In [17]:
model1 = gs.best_estimator_
pipe = Pipeline([('vect',vect_lat),('model',model1)])
y_pred = pipe.predict(test_lat.block.values)

In [18]:
for idx, val in enumerate(test_lat.index):
    df_pd.iloc[val,4] = y_pred[idx] 


In [19]:
df_pd.head()

Unnamed: 0,arrest,block,community_area,district,latitude,longitude
0,True,074XX N OLCOTT AVE,9.0,16.0,42.015614,-87.813912
1,True,051XX S PULASKI RD,62.0,8.0,41.800465,-87.72337
2,False,048XX S KARLOV AVE,57.0,8.0,41.80546,-87.725964
3,True,063XX S NASHVILLE AVE,64.0,8.0,41.777312,-87.786139
4,False,112XX S ELLIS AVE,50.0,5.0,41.690657,-87.605164


In [20]:
df_pd.to_pickle("./chicago.pkl")

In [6]:
unpickled_df = pd.read_pickle("./chicago.pkl")


In [7]:
unpickled_df = pd.concat([df_pd.unique_key, unpickled_df], axis=1, sort=False)

In [9]:
unpickled_df = unpickled_df.rename(index=str, columns={'latitude':'Latitude1','longitude':'Longitude1'})

In [10]:
df_sp2 = spark.createDataFrame(unpickled_df)

In [11]:
df = df_sp.join(df_sp2,['unique_key'],"inner")

In [12]:
df.show(5,False)

+----------+-----------+-----------------------+-----------------------+----+------------+-------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-----------------------+------------+-------------+-----------------------------+------+-----------------------+--------------+--------+------------+-------------+
|unique_key|case_number|date                   |block                  |iucr|primary_type|description        |location_description|arrest|domestic|beat|district|ward|community_area|fbi_code|x_coordinate|y_coordinate|year|updated_on             |latitude    |longitude    |location                     |arrest|block                  |community_area|district|Latitude1   |Longitude1   |
+----------+-----------+-----------------------+-----------------------+----+------------+-------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+---------

In [6]:
df_pd = df_sp.select('date','primary_type','domestic','year').toPandas()

In [13]:
from pyspark.sql.functions import to_timestamp,year, month, dayofmonth, hour,when 
from pyspark.sql.functions import lit

df_sp_dt = df.withColumn('dt',to_timestamp(df_sp.date,'yyyy-MM-dd HH:mm:ss'))\
        .withColumn('Year',year(col("dt")))\
        .withColumn('Month',month(col("dt")))\
        .withColumn('Hour',hour(col("dt")))\
        .withColumn('Night?', when(col('Hour')>=17,1).otherwise(0))

In [17]:
df_sp_dt.rdd.saveAsPickleFile("./chicago1.pkl")

pickleRdd = sc.pickleFile("./chicago1.pkl").collect()
df2 = spark.createDataFrame(pickleRdd)

In [18]:
from pyspark.ml.feature import HashingTF,IDF, Tokenizer
tokenizer = Tokenizer(inputCol="primary_type", outputCol="words")
wordsData = tokenizer.transform(df_sp_dt)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

In [22]:
rescaledData.show(20,False)

+----------+-----------+-----------------------+-----------------------+----+-------------------+---------------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-----------------------+------------+-------------+-----------------------------+------+-----------------------+--------------+--------+------------------+-----------------+-------------------+-----+----+------+-----------------------+----------------------------+-----------------------------------------------------------------------+
|unique_key|case_number|date                   |block                  |iucr|primary_type       |description                |location_description|arrest|domestic|beat|district|ward|community_area|fbi_code|x_coordinate|y_coordinate|Year|updated_on             |latitude    |longitude    |location                     |arrest|block                  |community_area|district|Latitude1         |Longitude1       |dt                

In [None]:
rescaledData