**Grab Data Analysis Using Spark**

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

**Run Spark Session**

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
!ls


sample_data		   spark-3.0.1-bin-hadoop2.7.tgz
spark-3.0.1-bin-hadoop2.7  training.csv


**Import modules for Linear Regression**

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import RandomForestRegressor


**Upload** **dataset**

In [None]:
dataset = spark.read.csv('/content/training.csv',inferSchema=True, header =True)

In [None]:
dataset.printSchema()

root
 |-- geohash6: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- demand: double (nullable = true)



In [None]:
dataset.show()

+--------+---+---------+--------------------+
|geohash6|day|timestamp|              demand|
+--------+---+---------+--------------------+
|  qp03wc| 18|     20:0|0.020071790937458638|
|  qp03pn| 10|    14:30|0.024720973824962248|
|  qp09sw|  9|     6:15|  0.1028209630305961|
|  qp0991| 32|      5:0| 0.08875479879512088|
|  qp090q| 15|      4:0| 0.07446838607998965|
|  qp03tu|  1|    12:15|0.023843140095447555|
|  qp096d| 25|     3:30|0.007460442605732274|
|  qp03nr| 51|    20:45|2.926078073538535E-4|
|  qp093r| 48|     6:15| 0.05417026945027532|
|  qp03r2|  4|    22:15| 0.12346297776239995|
|  qp09kb| 45|     9:15|0.026177137851266564|
|  qp03rp| 52|    11:45|  0.3594058680024252|
|  qp03w9| 46|    12:15|  0.5141364260243876|
|  qp096m| 34|    14:45| 0.05973996712236172|
|  qp091u| 40|     2:30|0.026408691368504468|
|  qp03md| 14|    14:45|0.013998466138292141|
|  qp099z| 27|     3:30|0.008413960369765126|
|  qp0990|  6|    23:45|0.029400141187935276|
|  qp03mf| 48|    11:30| 0.0572549

In [None]:
dataset.count()

4206321

In [None]:
for col in dataset.columns:
      dataset.describe([col]).show()

+-------+--------+
|summary|geohash6|
+-------+--------+
|  count| 4206321|
|   mean|    null|
| stddev|    null|
|    min|  qp02yc|
|    max|  qp0dnn|
+-------+--------+

+-------+------------------+
|summary|               day|
+-------+------------------+
|  count|           4206321|
|   mean|31.452991818741367|
| stddev|17.682781501454198|
|    min|                 1|
|    max|                61|
+-------+------------------+

+-------+---------+
|summary|timestamp|
+-------+---------+
|  count|  4206321|
|   mean|     null|
| stddev|     null|
|    min|      0:0|
|    max|     9:45|
+-------+---------+

+-------+--------------------+
|summary|              demand|
+-------+--------------------+
|  count|             4206321|
|   mean| 0.10509069575559664|
| stddev| 0.15926545710403398|
|    min|3.092217237505082...|
|    max|                 1.0|
+-------+--------------------+



In [None]:
for col in dataset.columns:
    print (col,"\t" "with null values:",dataset.filter(dataset[col].isNull()).count())

geohash6 	with null values: 0
day 	with null values: 0
timestamp 	with null values: 0
demand 	with null values: 0


In [None]:
pandas_df = dataset.toPandas()

In [None]:
pandas_df.head()

Unnamed: 0,geohash6,day,timestamp,demand,location
0,qp03wc,18,1200,0.020072,"(-5.35, 90.7)"
1,qp03pn,10,870,0.024721,"(-5.41, 90.7)"
2,qp09sw,9,375,0.102821,"(-5.33, 90.9)"
3,qp0991,32,300,0.088755,"(-5.35, 90.8)"
4,qp090q,15,240,0.074468,"(-5.41, 90.7)"


In [None]:
!pip install pygeohash

Collecting pygeohash
  Downloading https://files.pythonhosted.org/packages/2c/33/c912fa4476cedcd3ed9cd25c44c163583b92d319860438e6b632f7f42d0c/pygeohash-1.2.0.tar.gz
Building wheels for collected packages: pygeohash
  Building wheel for pygeohash (setup.py) ... [?25l[?25hdone
  Created wheel for pygeohash: filename=pygeohash-1.2.0-py2.py3-none-any.whl size=6162 sha256=78ecdf571b3ae193156d8af32fe31fc653be079c90395ce8a6b2888a98710641
  Stored in directory: /root/.cache/pip/wheels/3f/5f/14/989d83a271207dda28232746d63e737a2dbd88ea7f7a9db807
Successfully built pygeohash
Installing collected packages: pygeohash
Successfully installed pygeohash-1.2.0


In [None]:
import pygeohash as pgh
# ...
# location is a new column filled with (lat, lon) tuples
pandas_df['location'] = pandas_df.apply(lambda rec: pgh.decode(rec['geohash6']), axis=1)

In [None]:
pandas_df.head()

Unnamed: 0,geohash6,day,timestamp,demand,location
0,qp03wc,18,1200,0.020072,"(-5.35, 90.7)"
1,qp03pn,10,870,0.024721,"(-5.41, 90.7)"
2,qp09sw,9,375,0.102821,"(-5.33, 90.9)"
3,qp0991,32,300,0.088755,"(-5.35, 90.8)"
4,qp090q,15,240,0.074468,"(-5.41, 90.7)"


In [None]:
def get_min(time_str):
    """Get Seconds from time."""
    h, m = time_str.split(':')
    return int(h) * 60 + int(m)

pandas_df['timestamp'] = pandas_df['timestamp'].apply(get_min) 

In [None]:
pandas_df.head()

In [None]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
import pandas as pd

# Create a spark session
spark = SparkSession.builder.getOrCreate()

# Create pandas data frame and convert it to a spark data frame 
spark_df = spark.createDataFrame(pandas_df)


In [None]:
#Input all the features in one vector column
assembler = VectorAssembler(inputCols=['timestamp'], outputCol = 'Attributes')

output = assembler.transform(spark_df)

#Input vs Output
finalized_data = output.select("Attributes","demand")

finalized_data.show()

+----------+--------------------+
|Attributes|              demand|
+----------+--------------------+
|  [1200.0]|0.020071790937458638|
|   [870.0]|0.024720973824962248|
|   [375.0]|  0.1028209630305961|
|   [300.0]| 0.08875479879512088|
|   [240.0]| 0.07446838607998965|
|   [735.0]|0.023843140095447555|
|   [210.0]|0.007460442605732274|
|  [1245.0]|2.926078073538535E-4|
|   [375.0]| 0.05417026945027532|
|  [1335.0]| 0.12346297776239995|
|   [555.0]|0.026177137851266564|
|   [705.0]|  0.3594058680024252|
|   [735.0]|  0.5141364260243876|
|   [885.0]| 0.05973996712236172|
|   [150.0]|0.026408691368504468|
|   [885.0]|0.013998466138292141|
|   [210.0]|0.008413960369765126|
|  [1425.0]|0.029400141187935276|
|   [690.0]| 0.05725490104544065|
|  [1425.0]| 0.11276151708068224|
+----------+--------------------+
only showing top 20 rows



**Split training and test data and apply Linear Regressor model**

In [None]:
#Split training and testing data
train_data,test_data = finalized_data.randomSplit([0.8,0.2])


regressor = LinearRegression(featuresCol = 'Attributes', labelCol = 'demand')

#Learn to fit the model from training set
regressor = regressor.fit(train_data)

#To predict the prices on testing set
pred = regressor.evaluate(test_data)

#Predict the model
pred.predictions.show()

+----------+--------------------+-------------------+
|Attributes|              demand|         prediction|
+----------+--------------------+-------------------+
|     [0.0]|7.536689528149635E-5|0.12644843863685218|
|     [0.0]|8.789500489955804E-5|0.12644843863685218|
|     [0.0]|9.547689640366464E-5|0.12644843863685218|
|     [0.0]|1.011798154584004E-4|0.12644843863685218|
|     [0.0]|1.042189517332910...|0.12644843863685218|
|     [0.0]|1.212815308943407...|0.12644843863685218|
|     [0.0]|1.587190886752327...|0.12644843863685218|
|     [0.0]|1.844611065889411...|0.12644843863685218|
|     [0.0]|1.858734534179277E-4|0.12644843863685218|
|     [0.0]|2.635521639096378E-4|0.12644843863685218|
|     [0.0]|3.067740608667005E-4|0.12644843863685218|
|     [0.0]|3.613771205550209E-4|0.12644843863685218|
|     [0.0]| 3.64138312234956E-4|0.12644843863685218|
|     [0.0]|3.993911007785682E-4|0.12644843863685218|
|     [0.0]|4.375720651061483...|0.12644843863685218|
|     [0.0]|4.62164110865350

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="demand", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 0.158
MSE: 0.025
MAE: 0.096
r2: 0.007
