## Create spark context

In [6]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession.builder.config("spark.sql.shuffle.partitions", "2").appName("Analysis").master("local[2]").getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [7]:
%run "../includes/configuration"

## Define schema

In [8]:
circuits_schema = StructType(fields=[
    StructField("circuitId", IntegerType(), False),
    StructField("circuitRef", StringType(), True),
    StructField("name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("country", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lng", DoubleType(), True),
    StructField("alt", IntegerType(), True),
    StructField("url", StringType(), True),
])

## Read the CSV file from HDFS & apply schema

In [9]:
circuits_df = spark.read.option("header", True).schema(circuits_schema).csv(f"{data}/circuits.csv")

In [10]:
circuits_df.show(5)

+---------+-----------+--------------------+------------+---------+--------+-------+----+--------------------+
|circuitId| circuitRef|                name|    location|  country|     lat|    lng| alt|                 url|
+---------+-----------+--------------------+------------+---------+--------+-------+----+--------------------+
|        1|albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|144.968|  10|http://en.wikiped...|
|        2|     sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|101.738|null|http://en.wikiped...|
|        3|    bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|50.5106|null|http://en.wikiped...|
|        4|  catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|2.26111|null|http://en.wikiped...|
|        5|   istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517| 29.405|null|http://en.wikiped...|
+---------+-----------+--------------------+------------+---------+--------+-------+----+--------------------+
o

In [11]:
circuits_df.printSchema()

root
 |-- circuitId: integer (nullable = true)
 |-- circuitRef: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- url: string (nullable = true)



In [12]:
circuits_df.describe().show()

+-------+-----------------+----------+-------+---------+---------+------------------+------------------+----+--------------------+
|summary|        circuitId|circuitRef|   name| location|  country|               lat|               lng| alt|                 url|
+-------+-----------------+----------+-------+---------+---------+------------------+------------------+----+--------------------+
|  count|               74|        74|     74|       74|       74|                74|                74|   1|                  74|
|   mean|             37.5|      null|   null|     null|     null|33.698638243243224|3.1288148648648644|10.0|                null|
| stddev|21.50581316760657|      null|   null|     null|     null| 23.27327352478035| 66.04182770715761|null|                null|
|    min|                1|       BAK|A1-Ring|Abu Dhabi|Argentina|          -37.8497|          -118.189|  10|http://en.wikiped...|
|    max|               74|    zolder| Zolder|Zandvoort|  Vietnam|           57.265

## Remove unwanted columns

In [13]:
circuits_df_selected = circuits_df.select(col("circuitId"), col("circuitRef"), col("name"), col("location"), col("country"), col("lat"), col("lng"), col("alt"))

In [14]:
circuits_df_selected.show(5)

+---------+-----------+--------------------+------------+---------+--------+-------+----+
|circuitId| circuitRef|                name|    location|  country|     lat|    lng| alt|
+---------+-----------+--------------------+------------+---------+--------+-------+----+
|        1|albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|144.968|  10|
|        2|     sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|101.738|null|
|        3|    bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|50.5106|null|
|        4|  catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|2.26111|null|
|        5|   istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517| 29.405|null|
+---------+-----------+--------------------+------------+---------+--------+-------+----+
only showing top 5 rows



## Rename the columns as required

In [15]:
circuits_renamed_df = circuits_df_selected.withColumnRenamed("circuitId", "circuit_id")\
.withColumnRenamed("circuitRef", "circuit_ref").withColumnRenamed("lat", "latitude")\
.withColumnRenamed("lng", "longitude").withColumnRenamed("alt", "altitude")

In [18]:
circuits_renamed_df.show(5)

+----------+-----------+--------------------+------------+---------+--------+---------+--------+
|circuit_id|circuit_ref|                name|    location|  country|latitude|longitude|altitude|
+----------+-----------+--------------------+------------+---------+--------+---------+--------+
|         1|albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|  144.968|      10|
|         2|     sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|  101.738|    null|
|         3|    bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|  50.5106|    null|
|         4|  catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|  2.26111|    null|
|         5|   istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517|   29.405|    null|
+----------+-----------+--------------------+------------+---------+--------+---------+--------+
only showing top 5 rows



## Write the output to processed container in parquet container

In [20]:
circuits_renamed_df.write.mode('overwrite').parquet(f"{processed_folder_path}/circuits")