## Loading Data




In [None]:
from pyspark.sql import SparkSession

#Load File
file_path = 'Files/ipldata/ipl_summary_raw.csv'

# Read the CSV file into a DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)

## Data Partitioning

- **Data partitioning** refers to dividing the data into smaller chunks based on a specific column(s). 
- It improves query performance by allowing Spark to skip irrelevant partitions, thereby reducing the amount of data to scan.
- This is especially useful in large datasets where specific queries or operations are frequently run on a subset of the data.

**Example: Partitioning by `info_season`:**

In [None]:
# Partition the data by the 'info_season' column
df_partitioned = df.repartition("info_season")

# Write the partitioned Files to Folder (example: saving as Parquet format)
df_partitioned.write.parquet("Files/data/ipl_partitioned")



In [None]:
# Partition the data by the 'info_season' column
df_partitioned = df.repartition("info_season")

# Write the partitioned Files to Folder (example: saving as Managed Table)
df_partitioned.write.saveAsTable("ipl_partitioned_table")


In [None]:
# Partition the data by the 'info_season' column
df_partitioned = df.repartition("info_season")

# Write the partitioned Files to Folder (example: saving as Delta Table)
df_partitioned.write.format("delta").saveAsTable("ipl_partitioned_delta_table")


## Saving & Updating DataFrames 

In [13]:
### Sample data for City Dimension Table

from pyspark.sql import Row
# Sample data for City Dimension Table
city_dimension_data = [
    Row(info_city="Hyderabad", city_population=10000000, city_state="Telangana", city_country="India"),
    Row(info_city="Bengaluru", city_population=12000000, city_state="Karnataka", city_country="India"),
    Row(info_city="Chennai", city_population=7000000, city_state="Tamil Nadu", city_country="India"),
]

# Create City Dimension DataFrame
city_dimension_df = spark.createDataFrame(city_dimension_data)
city_dimension_df.show()

from pyspark.sql.functions import when

left_joined_df = df.join(city_dimension_df, on="info_city", how="left")

# Replace "Bangalore" with "Bengaluru" in the info_city column
df_replaced = df.withColumn("info_city", when(df.info_city == "Bangalore", "Bengaluru").otherwise(df.info_city))

# Perform a left join on the info_city column and select the desired columns
left_joined_df = df_replaced.join(city_dimension_df, on="info_city", how="left").select("info_city", "info_outcome_winner", "city_state")

# Show the result of the left join
left_joined_df.show(truncate=False)

city_dimension_df.write.saveAsTable("city_dimension")
left_joined_df.write.saveAsTable ("winner_by_state")


StatementMeta(, 9e507be7-9117-441f-a87a-608bc35f5836, 15, Finished, Available, Finished)

+---------+---------------+----------+------------+
|info_city|city_population|city_state|city_country|
+---------+---------------+----------+------------+
|Hyderabad|       10000000| Telangana|       India|
|Bengaluru|       12000000| Karnataka|       India|
|  Chennai|        7000000|Tamil Nadu|       India|
+---------+---------------+----------+------------+

+---------+---------------------------+----------+
|info_city|info_outcome_winner        |city_state|
+---------+---------------------------+----------+
|Mumbai   |Mumbai Indians             |null      |
|Mumbai   |Mumbai Indians             |null      |
|Mumbai   |Mumbai Indians             |null      |
|Kolkata  |Kolkata Knight Riders      |null      |
|Kolkata  |Kolkata Knight Riders      |null      |
|Pune     |Rising Pune Supergiant     |null      |
|Pune     |Delhi Daredevils           |null      |
|Delhi    |Delhi Daredevils           |null      |
|Delhi    |Kolkata Knight Riders      |null      |
|Bengaluru|Royal Challe

In [15]:
# Query the Saved table to verify

spark.sql("SELECT * FROM city_dimension").show()

StatementMeta(, 9e507be7-9117-441f-a87a-608bc35f5836, 17, Finished, Available, Finished)

+---------+---------------+----------+------------+
|info_city|city_population|city_state|city_country|
+---------+---------------+----------+------------+
|Hyderabad|       10000000| Telangana|       India|
|Bengaluru|       12000000| Karnataka|       India|
|  Chennai|        7000000|Tamil Nadu|       India|
+---------+---------------+----------+------------+



In [24]:
# Query the Saved table to verify

spark.sql("SELECT * FROM winner_by_state WHERE info_city ='Chennai'").show()

StatementMeta(, 9e507be7-9117-441f-a87a-608bc35f5836, 26, Finished, Available, Finished)

+---------+--------------------+----------+
|info_city| info_outcome_winner|city_state|
+---------+--------------------+----------+
|  Chennai| Chennai Super Kings|Tamil Nadu|
|  Chennai| Chennai Super Kings|Tamil Nadu|
|  Chennai| Chennai Super Kings|Tamil Nadu|
|  Chennai| Chennai Super Kings|Tamil Nadu|
|  Chennai| Chennai Super Kings|Tamil Nadu|
|  Chennai| Chennai Super Kings|Tamil Nadu|
|  Chennai|      Mumbai Indians|Tamil Nadu|
|  Chennai| Chennai Super Kings|Tamil Nadu|
|  Chennai|      Mumbai Indians|Tamil Nadu|
|  Chennai|Royal Challengers...|Tamil Nadu|
|  Chennai|Kolkata Knight Ri...|Tamil Nadu|
|  Chennai|      Mumbai Indians|Tamil Nadu|
|  Chennai|Royal Challengers...|Tamil Nadu|
|  Chennai|      Mumbai Indians|Tamil Nadu|
|  Chennai|Royal Challengers...|Tamil Nadu|
|  Chennai|      Delhi Capitals|Tamil Nadu|
|  Chennai| Sunrisers Hyderabad|Tamil Nadu|
|  Chennai|        Punjab Kings|Tamil Nadu|
|  Chennai|                null|Tamil Nadu|
|  Chennai| Chennai Super Kings|

In [21]:
### Update Sample data for City Dimension Table

from pyspark.sql import Row
# Sample data for City Dimension Table
city_dimension_data = [
    Row(info_city="Hyderabad", city_population=10000000, city_state="Telangana", city_country="India"),
    Row(info_city="Bengaluru", city_population=12000000, city_state="Karnataka", city_country="India"),
    Row(info_city="Chennai", city_population=7000000, city_state="Tamil Nadu", city_country="India"),
    Row(info_city="Mumbai", city_population=7000000, city_state="Maharashtra", city_country="India"),
    Row(info_city="Chandigarh", city_population=7000000, city_state="Punjab", city_country="India")
]

# Create City Dimension DataFrame
city_dimension_df = spark.createDataFrame(city_dimension_data)
city_dimension_df.show()

StatementMeta(, 9e507be7-9117-441f-a87a-608bc35f5836, 23, Finished, Available, Finished)

+----------+---------------+-----------+------------+
| info_city|city_population| city_state|city_country|
+----------+---------------+-----------+------------+
| Hyderabad|       10000000|  Telangana|       India|
| Bengaluru|       12000000|  Karnataka|       India|
|   Chennai|        7000000| Tamil Nadu|       India|
|    Mumbai|        7000000|Maharashtra|       India|
|Chandigarh|        7000000|     Punjab|       India|
+----------+---------------+-----------+------------+



In [22]:
city_dimension_df.write.mode("overwrite").saveAsTable("city_dimension")

StatementMeta(, 9e507be7-9117-441f-a87a-608bc35f5836, 24, Finished, Available, Finished)

In [25]:
from pyspark.sql.functions import when

left_joined_df = df.join(city_dimension_df, on="info_city", how="left")

# Replace "Bangalore" with "Bengaluru" in the info_city column
df_replaced = df.withColumn("info_city", when(df.info_city == "Bangalore", "Bengaluru").otherwise(df.info_city))

# Perform a left join on the info_city column and select the desired columns
left_joined_df = df_replaced.join(city_dimension_df, on="info_city", how="left").select("info_city", "info_outcome_winner", "city_state")

# Show the result of the left join
left_joined_df.show(truncate=False)
left_joined_df.write.mode("overwrite").saveAsTable("winner_by_state")

StatementMeta(, 9e507be7-9117-441f-a87a-608bc35f5836, 27, Finished, Available, Finished)

+---------+---------------------------+-----------+
|info_city|info_outcome_winner        |city_state |
+---------+---------------------------+-----------+
|Mumbai   |Mumbai Indians             |Maharashtra|
|Mumbai   |Mumbai Indians             |Maharashtra|
|Mumbai   |Mumbai Indians             |Maharashtra|
|Kolkata  |Kolkata Knight Riders      |null       |
|Kolkata  |Kolkata Knight Riders      |null       |
|Pune     |Rising Pune Supergiant     |null       |
|Pune     |Delhi Daredevils           |null       |
|Delhi    |Delhi Daredevils           |null       |
|Delhi    |Kolkata Knight Riders      |null       |
|Bengaluru|Royal Challengers Bangalore|Karnataka  |
|Bengaluru|Mumbai Indians             |Karnataka  |
|Bengaluru|Rising Pune Supergiant     |Karnataka  |
|Hyderabad|Sunrisers Hyderabad        |Telangana  |
|Hyderabad|Sunrisers Hyderabad        |Telangana  |
|Hyderabad|Sunrisers Hyderabad        |Telangana  |
|Hyderabad|Sunrisers Hyderabad        |Telangana  |
|Rajkot   |K

In [27]:
# Query the Saved table to verify

spark.sql("SELECT * FROM winner_by_state WHERE info_city ='Mumbai'").show()

StatementMeta(, 9e507be7-9117-441f-a87a-608bc35f5836, 29, Finished, Available, Finished)

+---------+--------------------+-----------+
|info_city| info_outcome_winner| city_state|
+---------+--------------------+-----------+
|   Mumbai|      Mumbai Indians|Maharashtra|
|   Mumbai|      Mumbai Indians|Maharashtra|
|   Mumbai|      Mumbai Indians|Maharashtra|
|   Mumbai|      Mumbai Indians|Maharashtra|
|   Mumbai|Rising Pune Super...|Maharashtra|
|   Mumbai|      Mumbai Indians|Maharashtra|
|   Mumbai|     Kings XI Punjab|Maharashtra|
|   Mumbai|Rising Pune Super...|Maharashtra|
|   Mumbai| Chennai Super Kings|Maharashtra|
|   Mumbai|    Delhi Daredevils|Maharashtra|
|   Mumbai|      Mumbai Indians|Maharashtra|
|   Mumbai| Sunrisers Hyderabad|Maharashtra|
|   Mumbai|      Mumbai Indians|Maharashtra|
|   Mumbai|    Rajasthan Royals|Maharashtra|
|   Mumbai|      Mumbai Indians|Maharashtra|
|   Mumbai| Chennai Super Kings|Maharashtra|
|   Mumbai| Chennai Super Kings|Maharashtra|
|   Mumbai|      Delhi Capitals|Maharashtra|
|   Mumbai|      Mumbai Indians|Maharashtra|
|   Mumbai