# Алексеев Артём и Бикбулатов Равиль
# Группа ИАД 19

# Инициализация PySpark
PySpark был установлен [так](https://medium.com/@naomi.fridman/install-pyspark-to-run-on-jupyter-notebook-on-windows-4ec2009de21f)

In [1]:
import findspark
findspark.init()

In [2]:
findspark.find()

'D:\\Spark\\spark-2.4.4-bin-hadoop2.7'

In [3]:
import pyspark
findspark.find()

'D:\\Spark\\spark-2.4.4-bin-hadoop2.7'

In [4]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import rand
import databricks.koalas as ks
import numpy as np
import pandas_profiling

# Инициализация SparkContext

In [5]:
conf = SparkConf().setAppName('taxi').setMaster('local[1]')
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)

# Задача
Датасет – New York Taxi  
Спрогнозировать цену поездки
# Данные
[Cсылка](https://www.kaggle.com/c/new-york-city-taxi-fare-prediction/data) на данные  
Был взят датасет train и обрезан до 20 мегабайт, чтобы это всё быстро работало на ноутбуке

# Чтение данных

In [6]:
sdf = sql_context.read.csv('.../train_cut.csv', header=True, inferSchema=True)

In [7]:
sdf.printSchema()

root
 |-- key: timestamp (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)



# Предобработка данных

**Переводим спарковский датафрейм в коаласовский датафрейм для удобства работы**

In [8]:
kdf = sdf.to_koalas()

In [9]:
type(kdf)

databricks.koalas.frame.DataFrame

In [10]:
kdf.head(16)

Unnamed: 0,key,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
0,2009-06-15 17:26:21,4.5,2009-06-15 17:26:21 UTC,-73.844311,40.721319,-73.84161,40.712278,1
1,2010-01-05 16:52:16,16.9,2010-01-05 16:52:16 UTC,-74.016048,40.711303,-73.979268,40.782004,1
2,2011-08-18 00:35:00,5.7,2011-08-18 00:35:00 UTC,-73.982738,40.76127,-73.991242,40.750562,2
3,2012-04-21 04:30:42,7.7,2012-04-21 04:30:42 UTC,-73.98713,40.733143,-73.991567,40.758092,1
4,2010-03-09 07:51:00,5.3,2010-03-09 07:51:00 UTC,-73.968095,40.768008,-73.956655,40.783762,1
5,2011-01-06 09:50:45,12.1,2011-01-06 09:50:45 UTC,-74.000964,40.73163,-73.972892,40.758233,1
6,2012-11-20 20:35:00,7.5,2012-11-20 20:35:00 UTC,-73.980002,40.751662,-73.973802,40.764842,1
7,2012-01-04 17:22:00,16.5,2012-01-04 17:22:00 UTC,-73.9513,40.774138,-73.990095,40.751048,1
8,2012-12-03 13:10:00,9.0,2012-12-03 13:10:00 UTC,-74.006462,40.726713,-73.993078,40.731628,1
9,2009-09-02 01:11:00,8.9,2009-09-02 01:11:00 UTC,-73.980658,40.733873,-73.99154,40.758138,2


**Запустим pandas profiling на данных** (файлы с отчётами приложены отдельно)

In [11]:
#profiling_df = kdf.to_pandas()
#type(profiling_df)

pandas.core.frame.DataFrame

In [13]:
#profile = profiling_df.profile_report(title='Initial Pandas Profiling Report')
#profile.to_file(output_file="InitialReport.html")
#print("Done")

После выполнения профайлера видно, что в датасете присутсвуют сэмплы с нулевыми координатами и есть небольшое количество выбросов в координатах и цене. Столбцы *pickup_longitude* и *dropoff_longitude* сильно коррелированы

**Метод очистки данных от выбросов в координатах, цене и избавление от корреляции**

In [10]:
def clean(df):
    df = df[(-76 <= df['pickup_longitude']) & (df['pickup_longitude'] <= -72)]
    df = df[(-76 <= df['dropoff_longitude']) & (df['dropoff_longitude'] <= -72)]
    df = df[(38 <= df['pickup_latitude']) & (df['pickup_latitude'] <= 42)]
    df = df[(38 <= df['dropoff_latitude']) & (df['dropoff_latitude'] <= 42)]
    df = df[(0 < df['fare_amount']) & (df['fare_amount'] <= 250)]
    df = df[(df['dropoff_longitude'] != df['pickup_longitude'])]
    df = df[(df['dropoff_latitude'] != df['pickup_latitude'])]
    
    return df

In [12]:
kdf = clean(kdf)

In [14]:
profiling_df1 = kdf.to_pandas()
type(profiling_df1)

pandas.core.frame.DataFrame

In [15]:
#profile1 = profiling_df1.profile_report(title='After Clean Pandas Profiling Report')
#profile1.to_file(output_file="AfterReport.html")
#print("Done")

Done


# Посмотрим на данные на карте. Для этого подключаем и настраиваем [gmaps](https://jupyter-gmaps.readthedocs.io/en/latest/index.html)
Карта подгружается динамически из интернета, поэтому на всякий случай есть скриншот карты

In [16]:
import gmaps
gmaps.configure(api_key="AIzaSyDmO6Rz3FfJrFAOvDNWx_mgHIswxZu1AMQ")

In [17]:
df1 = kdf[['pickup_latitude', 'pickup_longitude']]
df1.columns = ['latitude', 'longitude']
df2 = kdf[['dropoff_latitude', 'dropoff_longitude']]
df2.columns = ['latitude', 'longitude']
locations = ks.concat([df1, df2])
locations = locations.toPandas()
locations.head()

Unnamed: 0,latitude,longitude
0,40.721319,-73.844311
1,40.711303,-74.016048
2,40.76127,-73.982738
3,40.733143,-73.98713
4,40.768008,-73.968095


In [18]:
figure_layout = {'width': '800px', 'height': '500px', 'margin': '0 auto 0 auto'}
new_york_coordinates = (40.75, -73.90)
fig = gmaps.figure(center=new_york_coordinates, zoom_level=11, layout=figure_layout, map_type='ROADMAP')
heatmap_layer = gmaps.heatmap_layer(locations)
heatmap_layer.opacity = 0.7
heatmap_layer.max_intensity = 100
heatmap_layer.point_radius = 3
fig.add_layer(heatmap_layer)
fig

A Jupyter Widget

**Красным цветом показаны самые частые места заказов и окончания поездок на такси**

![heatmap.PNG](attachment:heatmap.PNG)

**Методы для рассчёта рассчёта манхэттоновского расстояния до трёх точек, в которых наблюдается самая большая концентрация заказов и конечных точек поездок на такси**  
**Также метод для парсинга даты**

In [19]:
def manhattan(pickup_lat, pickup_long, dropoff_lat, dropoff_long):
    return np.abs(dropoff_lat - pickup_lat) + np.abs(dropoff_long - pickup_long)

def add_time_features(df):
    df['pickup_datetime'] =  ks.to_datetime(df['pickup_datetime'], format="%Y-%m-%d %H:%M:%S %Z")
    df['year'] = df['pickup_datetime'].dt.year
    df['month'] = df['pickup_datetime'].dt.month
    df['day'] = df['pickup_datetime'].dt.day
    df['hour'] = df['pickup_datetime'].dt.hour
    df['weekday'] = df['pickup_datetime'].dt.weekday
    df = df.drop('pickup_datetime', axis=1)
    
    return df

def add_distances_features(df):
    ny = (-74.0063889, 40.7141667)
    jfk = (-73.7822222222, 40.6441666667)
    lgr = (-73.87, 40.77)
    
    lat1 = df['pickup_latitude']
    lat2 = df['dropoff_latitude']
    lon1 = df['pickup_longitude']
    lon2 = df['dropoff_longitude']
    
    df['manhattan'] = manhattan(lat1, lon1, lat2, lon2)
    
    df['downtown_pickup_distance'] = manhattan(ny[1], ny[0], lat1, lon1)
    df['downtown_dropoff_distance'] = manhattan(ny[1], ny[0], lat2, lon2)
    df['jfk_pickup_distance'] = manhattan(jfk[1], jfk[0], lat1, lon1)
    df['jfk_dropoff_distance'] = manhattan(jfk[1], jfk[0], lat2, lon2)
    df['lgr_pickup_distance'] = manhattan(lgr[1], lgr[0], lat1, lon1)
    df['lgr_dropoff_distance'] = manhattan(lgr[1], lgr[0], lat2, lon2)
    
    return df

In [20]:
kdf = add_time_features(kdf)

kdf = add_distances_features(kdf)

In [21]:
kdf.head()

Unnamed: 0,key,fare_amount,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,year,month,day,hour,weekday,manhattan,downtown_pickup_distance,downtown_dropoff_distance,jfk_pickup_distance,jfk_dropoff_distance,lgr_pickup_distance,lgr_dropoff_distance
0,2009-06-15 17:26:21,4.5,-73.844311,40.721319,-73.84161,40.712278,1,2009,6,15,21,0,0.011742,0.16923,0.166668,0.139241,0.127499,0.07437,0.086112
1,2010-01-05 16:52:16,16.9,-74.016048,40.711303,-73.979268,40.782004,1,2010,1,5,19,1,0.107481,0.012523,0.094958,0.300962,0.334883,0.204745,0.121272
2,2011-08-18 00:35:00,5.7,-73.982738,40.76127,-73.991242,40.750562,2,2011,8,18,4,3,0.019212,0.070754,0.051542,0.317619,0.315415,0.121468,0.14068
3,2012-04-21 04:30:42,7.7,-73.98713,40.733143,-73.991567,40.758092,1,2012,4,21,8,5,0.029386,0.038235,0.058747,0.293884,0.32327,0.153987,0.133475
4,2010-03-09 07:51:00,5.3,-73.968095,40.768008,-73.956655,40.783762,1,2010,3,9,10,1,0.027194,0.092135,0.119329,0.309714,0.314028,0.100087,0.100417


**Удаляем столбцы из которых выделили расстояния**

In [22]:
dropped_columns = ['pickup_longitude', 'pickup_latitude', 
                   'dropoff_longitude', 'dropoff_latitude']
kdf_clean = kdf.drop(dropped_columns, axis=1)

**Преобразуем обратно в спарковский датафрейм для того, чтобы применить к данным спарковский пайплайн**

In [23]:
spark_df = kdf_clean.to_spark()

In [24]:
spark_df.printSchema()

root
 |-- key: timestamp (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- year: long (nullable = true)
 |-- month: long (nullable = true)
 |-- day: long (nullable = true)
 |-- hour: long (nullable = true)
 |-- weekday: long (nullable = true)
 |-- manhattan: double (nullable = true)
 |-- downtown_pickup_distance: double (nullable = true)
 |-- downtown_dropoff_distance: double (nullable = true)
 |-- jfk_pickup_distance: double (nullable = true)
 |-- jfk_dropoff_distance: double (nullable = true)
 |-- lgr_pickup_distance: double (nullable = true)
 |-- lgr_dropoff_distance: double (nullable = true)



In [25]:
spark_df.limit(5).to_koalas()

Unnamed: 0,key,fare_amount,passenger_count,year,month,day,hour,weekday,manhattan,downtown_pickup_distance,downtown_dropoff_distance,jfk_pickup_distance,jfk_dropoff_distance,lgr_pickup_distance,lgr_dropoff_distance
0,2009-06-15 17:26:21,4.5,1,2009,6,15,21,0,0.011742,0.16923,0.166668,0.139241,0.127499,0.07437,0.086112
1,2010-01-05 16:52:16,16.9,1,2010,1,5,19,1,0.107481,0.012523,0.094958,0.300962,0.334883,0.204745,0.121272
2,2011-08-18 00:35:00,5.7,2,2011,8,18,4,3,0.019212,0.070754,0.051542,0.317619,0.315415,0.121468,0.14068
3,2012-04-21 04:30:42,7.7,1,2012,4,21,8,5,0.029386,0.038235,0.058747,0.293884,0.32327,0.153987,0.133475
4,2010-03-09 07:51:00,5.3,1,2010,3,9,10,1,0.027194,0.092135,0.119329,0.309714,0.314028,0.100087,0.100417


# Spark Pipeline

**Для всех числовых признаков применяем VectorAssembler и StandartScaler**

In [26]:
num_features = ['passenger_count', 'year', 'month', 'day', 'hour', 'weekday', 'manhattan',
                'downtown_pickup_distance', 'downtown_dropoff_distance',
                'jfk_pickup_distance', 'jfk_dropoff_distance',
                'lgr_pickup_distance', 'lgr_dropoff_distance']

unscaled_assembler = VectorAssembler(inputCols=num_features, outputCol="unscaled_features")

scaler = StandardScaler(inputCol="unscaled_features", outputCol="scaled_features")

stages = [unscaled_assembler, scaler]

In [27]:
stages

[VectorAssembler_d60e9609037f, StandardScaler_cf2f173b6d1d]

**Применяем пайплайн к данным**

In [28]:
pipeline = Pipeline(stages=stages)

pipeline_model = pipeline.fit(spark_df)

df_transform = pipeline_model.transform(spark_df)

In [29]:
type(df_transform)

pyspark.sql.dataframe.DataFrame

In [30]:
df_transform.limit(5).to_koalas()

Unnamed: 0,key,fare_amount,passenger_count,year,month,day,hour,weekday,manhattan,downtown_pickup_distance,downtown_dropoff_distance,jfk_pickup_distance,jfk_dropoff_distance,lgr_pickup_distance,lgr_dropoff_distance,unscaled_features,scaled_features
0,2009-06-15 17:26:21,4.5,1,2009,6,15,21,0,0.011742,0.16923,0.166668,0.139241,0.127499,0.07437,0.086112,"[1.0, 2009.0, 6.0, 15.0, 21.0, 0.0, 0.01174200...","[0.7640796835705781, 1076.5277564787634, 1.742..."
1,2010-01-05 16:52:16,16.9,1,2010,1,5,19,1,0.107481,0.012523,0.094958,0.300962,0.334883,0.204745,0.121272,"[1.0, 2010.0, 1.0, 5.0, 19.0, 1.0, 0.107480999...","[0.7640796835705781, 1077.0636090205646, 0.290..."
2,2011-08-18 00:35:00,5.7,2,2011,8,18,4,3,0.019212,0.070754,0.051542,0.317619,0.315415,0.121468,0.14068,"[2.0, 2011.0, 8.0, 18.0, 4.0, 3.0, 0.019212000...","[1.5281593671411562, 1077.5994615623658, 2.323..."
3,2012-04-21 04:30:42,7.7,1,2012,4,21,8,5,0.029386,0.038235,0.058747,0.293884,0.32327,0.153987,0.133475,"[1.0, 2012.0, 4.0, 21.0, 8.0, 5.0, 0.029386000...","[0.7640796835705781, 1078.1353141041673, 1.161..."
4,2010-03-09 07:51:00,5.3,1,2010,3,9,10,1,0.027194,0.092135,0.119329,0.309714,0.314028,0.100087,0.100417,"[1.0, 2010.0, 3.0, 9.0, 10.0, 1.0, 0.027194000...","[0.7640796835705781, 1077.0636090205646, 0.871..."


**Выбираем столбцы *scaled_features* (нормированные числовые признаки после пайплайна) и *fare_amount* – лейбл для предсказания**

In [31]:
df_transform_fin = df_transform.select('scaled_features', 'fare_amount')
df_transform_fin.limit(5).to_koalas()

Unnamed: 0,scaled_features,fare_amount
0,"[0.7640796835705781, 1076.5277564787634, 1.742...",4.5
1,"[0.7640796835705781, 1077.0636090205646, 0.290...",16.9
2,"[1.5281593671411562, 1077.5994615623658, 2.323...",5.7
3,"[0.7640796835705781, 1078.1353141041673, 1.161...",7.7
4,"[0.7640796835705781, 1077.0636090205646, 0.871...",5.3


In [32]:
#df_transform_fin = df_transform_fin.orderBy(rand())

# Разбиваем данные на обучающую и тестовую выборки

In [33]:
train_df, test_df = df_transform_fin.randomSplit([.8, .2],seed=1234)

In [34]:
test_df.limit(5).to_koalas()

Unnamed: 0,scaled_features,fare_amount
0,"[0.0, 1077.0636090205646, 3.1941968313110842, ...",9.7
1,"[0.0, 1077.0636090205646, 3.484578361430274, 0...",6.5
2,"[0.0, 1077.0636090205646, 3.484578361430274, 1...",16.1
3,"[0.0, 1077.5994615623658, 0.2903815301191895, ...",16.1
4,"[0.0, 1077.5994615623658, 0.580763060238379, 1...",10.9


In [35]:
input_dim = len(train_df.select('scaled_features').first()[0])

In [36]:
input_dim

13

# Создаём модель на Keras

In [37]:
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.layers import BatchNormalization
from keras import optimizers, regularizers
from keras.optimizers import Adam

Using TensorFlow backend.


In [38]:
BATCH_SIZE = 256
EPOCHS = 50
LEARNING_RATE = 0.1

model = Sequential()
model.add(Dense(256, activation='relu', input_shape=(input_dim,)))
model.add(BatchNormalization())
model.add(Dense(128, activation='relu'))
model.add(BatchNormalization())
model.add(Dense(64, activation='relu'))
model.add(BatchNormalization())
model.add(Dense(32, activation='relu'))
model.add(BatchNormalization())
model.add(Dense(8, activation='relu'))
model.add(BatchNormalization())
model.add(Dense(1))

adam = optimizers.adam(lr=LEARNING_RATE)
model.compile(loss='mse', optimizer=adam, metrics=['mae'])

In [39]:
model.summary()

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_1 (Dense)              (None, 256)               3584      
_________________________________________________________________
batch_normalization_1 (Batch (None, 256)               1024      
_________________________________________________________________
dense_2 (Dense)              (None, 128)               32896     
_________________________________________________________________
batch_normalization_2 (Batch (None, 128)               512       
_________________________________________________________________
dense_3 (Dense)              (None, 64)                8256      
_________________________________________________________________
batch_normalization_3 (Batch (None, 64)                256       
_________________________________________________________________
dense_4 (Dense)              (None, 32)               

# Подключаем Elephas и создаём ElephasEstimator

In [40]:
from elephas.ml_model import ElephasEstimator

optimizer_conf = optimizers.Adam(lr=LEARNING_RATE)
opt_conf = optimizers.serialize(optimizer_conf)

estimator = ElephasEstimator()
estimator.setFeaturesCol('scaled_features')
estimator.setLabelCol('fare_amount')
estimator.set_keras_model_config(model.to_yaml())
estimator.set_num_workers(1)
estimator.set_epochs(EPOCHS) 
estimator.set_batch_size(BATCH_SIZE)
estimator.set_verbosity(1)
estimator.set_validation_split(0.10)
estimator.set_optimizer_config(opt_conf)
estimator.set_mode("synchronous")
estimator.set_loss("mse")
estimator.set_metrics(['mae'])



ElephasEstimator_34fe652b90b2

**Создаём пайплайн из эстиматора**

In [41]:
dl_pipeline = Pipeline(stages=[estimator])

**Эстиматор падает с ошибкой**

In [40]:
dl_pipeline.fit(train_df)

>>> Fit model
total:0
count:0
UnreadVariable
UnreadVariable
UnreadVariable
UnreadVariable


Traceback (most recent call last):
  File "D:\Spark\spark-2.4.4-bin-hadoop2.7\python\pyspark\serializers.py", line 590, in dumps
    return cloudpickle.dumps(obj, 2)
  File "D:\Spark\spark-2.4.4-bin-hadoop2.7\python\pyspark\cloudpickle.py", line 863, in dumps
    cp.dump(obj)
  File "D:\Spark\spark-2.4.4-bin-hadoop2.7\python\pyspark\cloudpickle.py", line 260, in dump
    return Pickler.dump(self, obj)
  File "D:\Anaconda\envs\taxi\lib\pickle.py", line 409, in dump
    self.save(obj)
  File "D:\Anaconda\envs\taxi\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "D:\Anaconda\envs\taxi\lib\pickle.py", line 751, in save_tuple
    save(element)
  File "D:\Anaconda\envs\taxi\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "D:\Spark\spark-2.4.4-bin-hadoop2.7\python\pyspark\cloudpickle.py", line 406, in save_function
    self.save_function_tuple(obj)
  File "D:\Spark\spark-2.4.4-bin-hadoop2.7\p

PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects

# Теоретический метод для работы модели с пайплайном и Elephas эстиматором

In [42]:
def dl_pipeline_fit_score_results(dl_pipeline, label):
    fit_dl_pipeline = dl_pipeline.fit(train_df)
    pred_train = fit_dl_pipeline.transform(train_df)
    pred_test = fit_dl_pipeline.transform(test_df)
    pnl_train = pred_train.select(label, "prediction")
    pnl_test = pred_test.select(label, "prediction")
    
    pred_and_label_train = pnl_train.rdd.map(lambda row: (row[label], row['prediction']))
    pred_and_label_test = pnl_test.rdd.map(lambda row: (row[label], row['prediction']))

In [None]:
dl_pipeline_fit_score_results(dl_pipeline, 'fare_amount')

# Попробовали запустить по-другому, ошибка та же

In [43]:
from elephas.ml.adapter import df_to_simple_rdd
rdd = df_to_simple_rdd(train_df, features_col='scaled_features', label_col='fare_amount')

In [43]:
from elephas.spark_model import SparkModel

spark_model = SparkModel(model, frequency='epoch', mode='synchronous')
spark_model.fit(rdd, epochs=20, batch_size=32, verbose=0, validation_split=0.1)

>>> Fit model
total:0
count:0
UnreadVariable
UnreadVariable
UnreadVariable
UnreadVariable
UnreadVariable
UnreadVariable


Traceback (most recent call last):
  File "D:\Spark\spark-2.4.4-bin-hadoop2.7\python\pyspark\serializers.py", line 590, in dumps
    return cloudpickle.dumps(obj, 2)
  File "D:\Spark\spark-2.4.4-bin-hadoop2.7\python\pyspark\cloudpickle.py", line 863, in dumps
    cp.dump(obj)
  File "D:\Spark\spark-2.4.4-bin-hadoop2.7\python\pyspark\cloudpickle.py", line 260, in dump
    return Pickler.dump(self, obj)
  File "D:\Anaconda\envs\taxi\lib\pickle.py", line 409, in dump
    self.save(obj)
  File "D:\Anaconda\envs\taxi\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "D:\Anaconda\envs\taxi\lib\pickle.py", line 751, in save_tuple
    save(element)
  File "D:\Anaconda\envs\taxi\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "D:\Spark\spark-2.4.4-bin-hadoop2.7\python\pyspark\cloudpickle.py", line 406, in save_function
    self.save_function_tuple(obj)
  File "D:\Spark\spark-2.4.4-bin-hadoop2.7\p

PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects

# Ошибки с которыми столкнулись:
1. https://github.com/maxpumperla/elephas/issues/148 – эта ошибка решилась установкой TensorFlow 2
2. https://github.com/maxpumperla/elephas/issues/82 
3. https://github.com/maxpumperla/elephas/issues/146
4. https://github.com/maxpumperla/elephas/issues/151