In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import StorageLevel

In [2]:
spark = SparkSession.builder \
    .appName("ETL-MySQL_to_Hive") \
    .master("local[*]") \
    .config("spark.streaming.stopGracefullyOnShutdown","true") \
    .config("spark.jars.packeges","org.apache.spark:spark-sql-kafka-0-10_2.13:3.1.2") \
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
    .config("hive.metastore.uris", "thrift://your-hive-metastore-uri:port") \
    .config("spark.jars", "/path/to/mysql-connector-java-x.x.xx.jar") \
    .config("spark.driver.allowMultipleContexts","true") \
    .enableHiveSupport() \
    .getOrCreate()

In [3]:
mysql_url = "jdbc:mysql://localhost:3306/ylc"
mysql_properties = {
    "user": "student",
    "password": "student",
    "driver": "com.mysql.jdbc.Driver"
}
table_name = "taxi_trips"

In [4]:
df = spark.read.jdbc(
    url=mysql_url,
    table="taxi_trips",
    properties=mysql_properties,
    column="trip_id",
    lowerBound=1,
    upperBound=20332093,
    numPartitions=20
)

In [5]:
def handle_negatives(self):
    
    for column in self.columns:
        self = self.withColumn(column, abs(self[column]))
    return self

# Attach the function to the DataFrame class
DataFrame.handle_negatives = handle_negatives

In [6]:
# Handle Negatives
no_negative_df = df.handle_negatives()
# Check Negatives
no_negative_df.select(col("fare_amount")).where(col("fare_amount")< 0 ).show(5)



+-----------+
|fare_amount|
+-----------+
+-----------+



                                                                                

In [18]:
# Check Nulls and Outliers 
no_negative_df.select(col("RatecodeID")).distinct().show()



+----------+
|RatecodeID|
+----------+
|      null|
|       1.0|
|       4.0|
|       3.0|
|       2.0|
|       6.0|
|       5.0|
+----------+





In [7]:
# Fill NA values in all columns with 0
no_negative_no_null_df = no_negative_df.fillna(0)


In [25]:
# Check Nulls Again
no_negative_no_null_df.select(col("RatecodeID")).distinct().show()

                                                                                

+----------+
|RatecodeID|
+----------+
|       0.0|
|       1.0|
|       4.0|
|       3.0|
|       2.0|
|       6.0|
|       5.0|
+----------+





In [27]:
#Check Vendor_ID outlaiers
no_negative_no_null_df.groupBy("VendorID").count().show()

                                                                                

+--------+--------+
|VendorID|   count|
+--------+--------+
|       6|    1248|
|       1| 4922071|
|       2|15408774|
+--------+--------+



In [8]:
#Handle Outliers
no_negative_no_null_df = no_negative_no_null_df.filter(col("VendorID") != 6)

In [33]:
# Check payment_type Nulls and Outliers
no_negative_no_null_df.groupBy("payment_type").count().show()

                                                                                

+------------+--------+
|payment_type|   count|
+------------+--------+
|           0| 1974737|
|           5|       2|
|           1|15111929|
|           3|  133352|
|           2| 2773067|
|           4|  337758|
+------------+--------+



In [9]:
#Persist Final_NYC (Data After Cleaning) to enhance Performance
Final_NYC = no_negative_no_null_df.repartition(200)
Final_NYC.persist(StorageLevel.DISK_ONLY)

DataFrame[Airport_fee: double, DOLocationID: bigint, PULocationID: bigint, RatecodeID: double, VendorID: bigint, congestion_surcharge: double, extra: double, fare_amount: double, improvement_surcharge: double, mta_tax: double, passenger_count: double, payment_type: bigint, store_and_fwd_flag: double, tip_amount: double, tolls_amount: double, total_amount: double, tpep_dropoff_datetime: bigint, tpep_pickup_datetime: bigint, trip_distance: double, trip_id: int]

In [24]:
def DIM_Vendor(Final_NYC):
    dim_vendor= Final_NYC \
        .withColumn(
        "vendor_name"
        ,when(col("VendorID") == 1, "Creative Mobile Technologies")
        .when(col("VendorID") == 2, "VeriFone Inc")
    ).select("VendorID", 
             "vendor_name").distinct()
    return dim_vendor

In [20]:
def Dim_Rate(Final_NYC):
    dim_rate = Final_NYC \
    .withColumn(  "Rate"
        ,when(col("RatecodeID") == 1, "Standard rate")
        .when(col("RatecodeID") == 2, "JFK")
        .when(col("RatecodeID") == 3, "Newark")
        .when(col("RatecodeID") == 4, "Nassau or Westchester")
        .when(col("RatecodeID") == 5, "Negotiated fare")
        .when(col("RatecodeID") == 6, "Group ride")
        .otherwise("Unknown")
    ).select("RatecodeID",
             "Rate").distinct()
    return dim_rate

In [22]:
def Dim_Payment(Final_NYC):
    dim_payment = Final_NYC \
    .withColumnRenamed("payment_type" , "payment_type_id"
   ).withColumn("payment_type"
        ,when(col("payment_type_id") == 1, "Credit card")
        .when(col("payment_type_id") == 2, "Cash")
        .when(col("payment_type_id") == 3, "No charge")
        .when(col("payment_type_id") == 4, "Dispute")
        .when(col("payment_type_id") == 5, "Unknown")
        .when(col("payment_type_id") == 6, "Voided trip")
        .otherwise("Unknown")
    ).select("payment_type_id",
             "payment_type").distinct()
    return dim_payment

In [14]:
def Dim_Dropoff_Date(Final_NYC):
    dim_dropoff_date = Final_NYC \
    .withColumn("tpep_dropoff_date_time", to_timestamp(col("tpep_dropoff_datetime"))) \
    .select(
        col("tpep_dropoff_datetime").alias("dropoff_timestamp"),
        col("tpep_dropoff_date_time").alias("Dropoff_date_time"),
        year(col("tpep_dropoff_date_time")).alias("Dropoff_Year"),
        month(col("tpep_dropoff_date_time")).alias("Dropoff_Month"),
        dayofmonth(col("tpep_dropoff_date_time")).alias("Dropoff_Day"),
        date_format(col("tpep_dropoff_date_time"), 'EEEE').alias("Dropoff_Day_of_Week"),
        hour(col("tpep_dropoff_date_time")).alias("Dropoff_Hour")
    ).distinct()
    return dim_dropoff_date

In [11]:
def Dim_Pickup_Date(Final_NYC):
    dim_pickup_date = Final_NYC \
    .withColumn("tpep_pickup_date_time", to_timestamp(col("tpep_pickup_datetime"))) \
    .select(
        col("tpep_pickup_datetime").alias("pick_up_timestamp"),
        col("tpep_pickup_date_time").alias("Pickup_date_time"),
        year(col("tpep_pickup_date_time")).alias("Pickup_Year"),
        month(col("tpep_pickup_date_time")).alias("Pickup_Month"),
        dayofmonth(col("tpep_pickup_date_time")).alias("Pickup_Day"),
        date_format(col("tpep_pickup_date_time"), 'EEEE').alias("Pickup_Day_of_Week"),
        hour(col("tpep_pickup_date_time")).alias("Pickup_Hour")
    ).distinct()
    return dim_pickup_date

In [10]:
def Fact_trips(Final_NYC):
    fact_trips = Final_NYC \
        .select(
        col("trip_id"),
        col("RatecodeID"),
        col("VendorID"),
        col("payment_type").alias("payment_type_id"),
        col("Airport_fee"),
        col("congestion_surcharge"),
        col("extra"),
        col("fare_amount"),
        col("improvement_surcharge"),
        col("mta_tax"),
        col("passenger_count"),
        col("tip_amount"),
        col("tolls_amount"),
        col("total_amount"),
        col("trip_distance"),
        col("tpep_dropoff_datetime").alias("dropoff_datetime"),
        col("tpep_pickup_datetime").alias("pickup_datetime")
    )
    return fact_trips

In [11]:
fact_trips = Fact_trips(Final_NYC)

fact_trips \
    .write \
    .mode("append") \
    .saveAsTable("nyc_dwh.fact_trips")

2024-10-18 01:17:01,401 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
2024-10-18 01:17:01,402 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
2024-10-18 01:27:02,540 WARN session.SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
2024-10-18 01:27:03,063 WARN conf.HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
2024-10-18 01:27:03,063 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
2024-10-18 01:27:03,063 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
2024-10-18 01:27:04,532 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


'\nfact_trips.coalesce(20)     .write     .mode("overwrite")     .parquet("file:///home/student/NYC/Fact_trips.parquet")\n'

In [21]:
dim_rate = Dim_Rate(Final_NYC)

dim_rate \
    .write \
    .mode("append") \
    .saveAsTable("nyc_dwh.dim_rate")

                                                                                

In [23]:
dim_payment = Dim_Payment(Final_NYC)

dim_payment \
    .write \
    .mode("append") \
    .saveAsTable("nyc_dwh.dim_payment")

                                                                                

In [26]:
dim_vendor = DIM_Vendor(Final_NYC)

dim_vendor \
    .write \
    .mode("append") \
    .saveAsTable("nyc_dwh.dim_vendor")

                                                                                

In [12]:
dim_pickup_date = Dim_Pickup_Date(Final_NYC)

dim_pickup_date \
    .write \
    .mode("append") \
    .saveAsTable("nyc_dwh.dim_pickup_date")

2024-10-19 23:55:05,021 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
2024-10-19 23:55:05,021 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
2024-10-20 00:04:59,488 WARN session.SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
2024-10-20 00:05:00,127 WARN conf.HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
2024-10-20 00:05:00,127 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
2024-10-20 00:05:00,127 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
2024-10-20 00:05:02,161 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
                                                                                

In [15]:
dim_dropoff_date = Dim_Dropoff_Date(Final_NYC)

dim_dropoff_date \
    .write \
    .mode("append") \
    .saveAsTable("nyc_dwh.dim_dropoff_date")

                                                                                