# data_ingestion - single or multilines


In [1]:
# Importation des bibliothèques
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType

# Création d'une session Spark
spark = SparkSession.builder.appName("Data Ingestion").getOrCreate()

## Dataframe Reader

### Defining data files variable:

In [2]:
constructors_path = "../data/raw/constructors.json"
drivers_path = "../data/raw/drivers.json"
pit_stops_path = "../data/raw/pit_stops.json"

### One line JSON file :

{"constructorId":1,"constructorRef":"mclaren","name":"McLaren","nationality":"British","url":"http://en.wikipedia.org/wiki/McLaren"}


In [3]:
df_constructors_schema = StructType(fields=[
                                        StructField("constructorId", IntegerType(), False),
                                        StructField("constructorRef", StringType(), True),
                                        StructField("name", StringType(), True),
                                        StructField("nationality", StringType(), True),
                                        StructField("url", StringType(), True)
                                ])

df_constructors = spark.read.schema(df_constructors_schema).json(constructors_path)
df_constructors.printSchema()
df_constructors.show(n=10)

root
 |-- constructorId: integer (nullable = true)
 |-- constructorRef: string (nullable = true)
 |-- name: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- url: string (nullable = true)

+-------------+--------------+-----------+-----------+--------------------+
|constructorId|constructorRef|       name|nationality|                 url|
+-------------+--------------+-----------+-----------+--------------------+
|            1|       mclaren|    McLaren|    British|http://en.wikiped...|
|            2|    bmw_sauber| BMW Sauber|     German|http://en.wikiped...|
|            3|      williams|   Williams|    British|http://en.wikiped...|
|            4|       renault|    Renault|     French|http://en.wikiped...|
|            5|    toro_rosso| Toro Rosso|    Italian|http://en.wikiped...|
|            6|       ferrari|    Ferrari|    Italian|http://en.wikiped...|
|            7|        toyota|     Toyota|   Japanese|http://en.wikiped...|
|            8|   super_agur

### Nested fields

{"driverId":1,"driverRef":"hamilton","number":44,"code":"HAM","name":{"forename":"Lewis","surname":"Hamilton"},"dob":"1985-01-07","nationality":"British","url":"http://en.wikipedia.org/wiki/Lewis_Hamilton"}


In [4]:
name_schema = StructType(fields=[
                                StructField("forename", StringType(), True),
                                StructField("surname", StringType(), True)
])

df_drivers_schema = StructType(fields=[
                                        StructField("driverId", IntegerType(), False),
                                        StructField("driverRef", StringType(), True),
                                        StructField("number", IntegerType(), True),
                                        StructField("code", StringType(), True),
                                        StructField("name", name_schema),
                                        StructField("dob", DateType(), True),
                                        StructField("nationality", StringType(), True),
                                        StructField("url", StringType(), True)
                                ])

df_drivers = spark.read.schema(df_drivers_schema).json(drivers_path)
df_drivers.printSchema()
df_drivers.show(n=10)

root
 |-- driverId: integer (nullable = true)
 |-- driverRef: string (nullable = true)
 |-- number: integer (nullable = true)
 |-- code: string (nullable = true)
 |-- name: struct (nullable = true)
 |    |-- forename: string (nullable = true)
 |    |-- surname: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- nationality: string (nullable = true)
 |-- url: string (nullable = true)

+--------+----------+------+----+--------------------+----------+-----------+--------------------+
|driverId| driverRef|number|code|                name|       dob|nationality|                 url|
+--------+----------+------+----+--------------------+----------+-----------+--------------------+
|       1|  hamilton|    44| HAM|   {Lewis, Hamilton}|1985-01-07|    British|http://en.wikiped...|
|       2|  heidfeld|  NULL| HEI|    {Nick, Heidfeld}|1977-05-10|     German|http://en.wikiped...|
|       3|   rosberg|     6| ROS|     {Nico, Rosberg}|1985-06-27|     German|http://en.wikiped...|
|      

### Multiline json file

   {<br>
      "raceId":841,<br>
      "driverId":153,<br>
      "stop":1,<br>
      "lap":1,<br>
      "time":"17:05:23",<br>
      "duration":26.898,<br>
      "milliseconds":26898<br>
   }

In [5]:
pit_stops_schema = StructType(fields=[
                                        StructField("raceId", IntegerType(), False),
                                        StructField("driverId", IntegerType(), True),
                                        StructField("stop", StringType(), True),
                                        StructField("lap", IntegerType(), True),
                                        StructField("time", StringType(), True),
                                        StructField("duration", StringType(), True),
                                        StructField("milliseconds", IntegerType(), True)
])

df_pit_stops = spark.read.json(pit_stops_path, multiLine=True, schema=pit_stops_schema)
df_pit_stops.printSchema()
df_pit_stops.show(n=10)

root
 |-- raceId: integer (nullable = true)
 |-- driverId: integer (nullable = true)
 |-- stop: string (nullable = true)
 |-- lap: integer (nullable = true)
 |-- time: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- milliseconds: integer (nullable = true)

+------+--------+----+---+--------+--------+------------+
|raceId|driverId|stop|lap|    time|duration|milliseconds|
+------+--------+----+---+--------+--------+------------+
|   841|     153|   1|  1|17:05:23|  26.898|       26898|
|   841|      30|   1|  1|17:05:52|  25.021|       25021|
|   841|      17|   1| 11|17:20:48|  23.426|       23426|
|   841|       4|   1| 12|17:22:34|  23.251|       23251|
|   841|      13|   1| 13|17:24:10|  23.842|       23842|
|   841|      22|   1| 13|17:24:29|  23.643|       23643|
|   841|      20|   1| 14|17:25:17|  22.603|       22603|
|   841|     814|   1| 14|17:26:03|  24.863|       24863|
|   841|     816|   1| 14|17:26:50|  25.259|       25259|
|   841|      67|   1| 15

## Clean and Close the door

In [6]:
spark.stop()