In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from pyspark.sql.functions import current_timestamp, to_timestamp, concat, col, lit
from pyspark.sql import functions as sf

spark= SparkSession.builder.appName('Practise').getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/30 20:46:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
drivers_df = spark.read.csv('/Users/mac/Documents/dataengineer/f1db_csv/drivers.csv',header=True, inferSchema=True)
# tạo cột bằng cách kết nối 2 cột
drivers_df = drivers_df \
.withColumn('name',sf.concat(sf.col('forename'),sf.lit(' '), sf.col('surname')))\
.withColumnRenamed('number','driver_number')\
.withColumnRenamed('name','driver_name').withColumnRenamed('nationality','driver_nationality')


In [3]:
constructors_df = spark.read.csv('/Users/mac/Documents/dataengineer/f1db_csv/constructors.csv',header= True,inferSchema=True)\
    .withColumnRenamed('name','team')


In [4]:
spark = SparkSession.builder.appName("Practise").getOrCreate()
races_schema = StructType(fields=[StructField('raceId', IntegerType(), False),
                                  StructField('year', IntegerType(), True),
                                  StructField('round', IntegerType(), True),
                                  StructField(
                                      'circuitId', IntegerType(), True),
                                  StructField('name', StringType(), True),
                                  StructField('date', DateType(), True),
                                  StructField('time', StringType(), True),
                                  StructField('url', StringType(), True)
                                  ])
races_df = spark.read.option('header', True).schema(races_schema).csv(
    '/Users/mac/Documents/dataengineer/f1db_csv/races.csv')
races_df = races_df.withColumn(
    'ingestion_date', current_timestamp()).withColumn('race_timestamp', to_timestamp(concat(col('date'), lit(' '), col('time')), 'yyyy-MM-dd HH:mm:ss'))
races_df = races_df.select(col('raceId').alias('race_id'), col('year').alias('race_year'), col('round'), col('circuitId')
                                                    .alias('circuit_id'),
                                                    col('name'), col('ingestion_date'), col('race_timestamp'))\
                                                    .withColumnRenamed('race_timestamp','race_date').withColumnRenamed('name','race_name')
                                                    

In [5]:
circuits_df = spark.read.csv('/Users/mac/Documents/dataengineer/f1db_csv/circuits.csv',header= True ,inferSchema= True)\
    .withColumnRenamed('location','circuit_location')


In [6]:
results_df = spark.read.csv('/Users/mac/Documents/dataengineer/f1db_csv/results.csv',header= True,inferSchema=True)\
    .withColumnRenamed('time','race_time')


In [7]:
circuits_df.show()

+---------+--------------+--------------------+----------------+---------+--------+---------+---+--------------------+
|circuitId|    circuitRef|                name|circuit_location|  country|     lat|      lng|alt|                 url|
+---------+--------------+--------------------+----------------+---------+--------+---------+---+--------------------+
|        1|   albert_park|Albert Park Grand...|       Melbourne|Australia|-37.8497|  144.968| 10|http://en.wikiped...|
|        2|        sepang|Sepang Internatio...|    Kuala Lumpur| Malaysia| 2.76083|  101.738| 18|http://en.wikiped...|
|        3|       bahrain|Bahrain Internati...|          Sakhir|  Bahrain| 26.0325|  50.5106|  7|http://en.wikiped...|
|        4|     catalunya|Circuit de Barcel...|        Montmeló|    Spain|   41.57|  2.26111|109|http://en.wikiped...|
|        5|      istanbul|       Istanbul Park|        Istanbul|   Turkey| 40.9517|   29.405|130|http://en.wikiped...|
|        6|        monaco|   Circuit de Monaco| 

In [8]:
race_circuits_df = races_df.join(circuits_df,races_df.circuit_id == circuits_df.circuitId,'inner' )\
    .select(races_df.race_id,races_df.race_year,races_df.race_name,races_df.race_date,circuits_df.circuit_location)


In [9]:
results_df.show()

+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+
|resultId|raceId|driverId|constructorId|number|grid|position|positionText|positionOrder|points|laps|  race_time|milliseconds|fastestLap|rank|fastestLapTime|fastestLapSpeed|statusId|
+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+
|       1|    18|       1|            1|    22|   1|       1|           1|            1|  10.0|  58|1:34:50.616|     5690616|        39|   2|      1:27.452|        218.300|       1|
|       2|    18|       2|            2|     3|   5|       2|           2|            2|   8.0|  58|     +5.478|     5696094|        41|   3|      1:27.739|        217.586|       1|
|       3|    18|       3|            3|     7|   7|       3|           3|            3|  

In [10]:
race_results_df = results_df.join(race_circuits_df,results_df.raceId == race_circuits_df.race_id,'inner')\
                            .join(drivers_df,results_df.driverId==drivers_df.driverId,'inner')\
                            .join(constructors_df,results_df.constructorId == constructors_df.constructorId,'inner')


In [11]:
final_df = race_results_df.select('race_year','race_name','race_date','circuit_location','driver_name','driver_number','driver_nationality','team',\
    'grid','fastestLap','race_time','points','position').withColumn('created_date',current_timestamp())
final_df = final_df.filter("race_year == 2020 and race_name == 'Abu Dhabi Grand Prix'").orderBy(final_df.points.desc())
# final_df.coalesce(1).write.csv("/Users/mac/Documents/dataengineer/f1db_csv/a")
# final_df.write.csv("/Users/mac/Documents/dataengineer/f1db_csv/abc.csv")



# lưu file sang đuôi parquet
final_df.write.mode("append").save( "/Users/mac/Documents/dataengineer/f1db_csv/a")


final_df.show()

                                                                                

+---------+--------------------+-------------------+----------------+------------------+-------------+------------------+------------+----+----------+-----------+------+--------+--------------------+
|race_year|           race_name|          race_date|circuit_location|       driver_name|driver_number|driver_nationality|        team|grid|fastestLap|  race_time|points|position|        created_date|
+---------+--------------------+-------------------+----------------+------------------+-------------+------------------+------------+----+----------+-----------+------+--------+--------------------+
|     2020|Abu Dhabi Grand Prix|2020-12-13 13:10:00|       Abu Dhabi|    Max Verstappen|           33|             Dutch|    Red Bull|   1|        14|1:36:28.645|  25.0|       1|2022-08-30 20:46:...|
|     2020|Abu Dhabi Grand Prix|2020-12-13 13:10:00|       Abu Dhabi|   Valtteri Bottas|           77|           Finnish|    Mercedes|   2|        40|    +15.976|  18.0|       2|2022-08-30 20:46:...|


22/08/30 22:12:45 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1983324 ms exceeds timeout 120000 ms
22/08/30 22:12:45 WARN SparkContext: Killing executors is not supported by current scheduler.
