# Create environment

In [227]:
import pandas as pd
import numpy as np
%matplotlib inline

In [230]:
#Create a connection
import pyspark as ps
from pyspark import SparkContext
try:
    sc.stop()
except:
    print('SparkContext is not created!')

sc = SparkContext(master = "local", appName = "App").getOrCreate()
print(sc, sc.version)

<SparkContext master=local appName=App> 2.4.5


In [231]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f4ebc0a25c0>


# Dataset

In [234]:
# List all table exist in spark sesion
spark.catalog.listTables()

[]

In [None]:
ls dataset/dataset_csv/

In [232]:
dataset = spark.read.format('csv').options(header='true', inferSchema='true').load('dataset/dataset_csv/current-data-on-the-geographic-distribution-of-covid-19-cases-worldwide.csv')
dataset.show(5)

+-------+---+-----+----+-----+------+-----------------------+-----+--------------------+-----------+------------+
|daterep|day|month|year|cases|deaths|countriesandterritories|geoid|countryterritorycode|popdata2018|continentexp|
+-------+---+-----+----+-----+------+-----------------------+-----+--------------------+-----------+------------+
|   null| 28|    4|2020|  172|     0|            Afghanistan|   AF|                 AFG|   37172386|        Asia|
|   null| 27|    4|2020|   68|    10|            Afghanistan|   AF|                 AFG|   37172386|        Asia|
|   null| 26|    4|2020|  112|     4|            Afghanistan|   AF|                 AFG|   37172386|        Asia|
|   null| 25|    4|2020|   70|     1|            Afghanistan|   AF|                 AFG|   37172386|        Asia|
|   null| 24|    4|2020|  105|     2|            Afghanistan|   AF|                 AFG|   37172386|        Asia|
+-------+---+-----+----+-----+------+-----------------------+-----+--------------------+

In [233]:
dataset.printSchema()

root
 |-- daterep: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- cases: integer (nullable = true)
 |-- deaths: integer (nullable = true)
 |-- countriesandterritories: string (nullable = true)
 |-- geoid: string (nullable = true)
 |-- countryterritorycode: string (nullable = true)
 |-- popdata2018: integer (nullable = true)
 |-- continentexp: string (nullable = true)



In [236]:
dataset.createOrReplaceTempView('data_temp')
spark.catalog.listTables()

[Table(name='data_temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

Some function usefull:
<br>.withColumn(“newColumnName”, formular)
<br>.withColumnRenamed(“oldColumnName”, “newColumnName”)
<br>.select(“column1”, “column2”, … , “columnt”, formular) 
<br>.selectExpr(“column1”, “column2”, … , “columnt”, “formularExpr”)
<br>.filter(condition)
<br>.groupBy(“column1”, “column2”,…,”columnt”).[avg(), min(), max(), sum()]
<br>.join(tableName, on = “columnNameJoin”, how = “leftouter”)

In [257]:
spark.sql('SELECT SUM(cases) AS casesByCountry, SUM(deaths) AS deathsByCountry, countriesandterritories FROM data_temp GROUP BY countriesandterritories ORDER BY deathsByCountry DESC').show()

+--------------+---------------+-----------------------+
|casesByCountry|deathsByCountry|countriesandterritories|
+--------------+---------------+-----------------------+
|        988451|          56245|   United_States_of_...|
|        199414|          26977|                  Italy|
|        128339|          23293|                 France|
|        209465|          23190|                  Spain|
|        157149|          21092|         United_Kingdom|
|         46687|           7207|                Belgium|
|        156337|           5913|                Germany|
|         91472|           5806|                   Iran|
|         83938|           4637|                  China|
|         66501|           4543|                 Brazil|
|         38245|           4518|            Netherlands|
|        112261|           2900|                 Turkey|
|         48489|           2707|                 Canada|
|         18926|           2274|                 Sweden|
|         15529|           1434

# Transform Raw data into Features

In [None]:
#Remove column
dataset = dataset.drop("daterep")

In [None]:
# Remove null value
dataset_nonull = dataset.na.drop()

In [None]:
dataset_nonull.printSchema()

In [None]:
from pyspark.ml import Pipeline #Tool for big data
from pyspark.ml.feature import StringIndexer #String to Index

indexers = [StringIndexer(inputCol=column, outputCol=column+"Index").fit(dataset_nonull) \
            for column in list(set(['countriesandterritories','geoid', 'countryterritorycode', 'continentexp'])) ]

pipeline = Pipeline(stages=indexers)
datasetIndexed = pipeline.fit(dataset_nonull).transform(dataset_nonull)

In [None]:
datasetIndexed.printSchema()

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler #Transform to Vector

assembler = VectorAssembler(
    inputCols=["day", "month", "year", "cases", "popdata2018", 
               "continentexpIndex", "countriesandterritoriesIndex", "countryterritorycodeIndex", "geoidIndex"],
    outputCol="features")

datasetVector = assembler.transform(datasetIndexed)

In [None]:
from pyspark.ml.feature import MinMaxScaler

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scaler = scaler.fit(datasetVector)

# rescale each feature to range [min, max].
datasetScaled = scalerModel.transform(datasetVector)

In [None]:
from pyspark.ml.feature import ChiSqSelector

selector = ChiSqSelector(numTopFeatures=1, featuresCol="scaledFeatures",
                         outputCol="selectedFeatures", labelCol="deaths")

datasetSelectedFeatures = selector.fit(datasetScaled).transform(datasetScaled)

In [None]:
dataset_final = datasetSelectedFeatures.withColumn("label",dataset_final.deaths)

# Training

In [None]:
train, test = dataset_final.randomSplit([0.9, 0.1], seed=12345)

In [None]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=1, featuresCol='scaledFeatures', labelCol='label')

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).build()
tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           trainRatio=0.8)

In [None]:
model = lr.fit(train)

In [None]:
valuesAndPreds = model.transform(test).select("label", "prediction")

In [None]:
valuesAndPreds = parsedData.map(lambda p: (float(model.predict(p.scaledFeatures)), p.label))

In [None]:
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(model.coefficients))
print("Intercept: %s" % str(model.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = model.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

In [None]:
valuesAndPreds.show(valuesAndPreds.count())