### TASK 1 : USE PYSPARK TO CLEAN AND PREPROCESS A LARGE DATASET, HANDLING MISSING VALUES AND DUPLICATES.
### DELIVERABLE: A PYTHON SCRIPT OR NOTEBOOK SHOWCASING THE DATA CLEANING PROCESS.

In [None]:
# Install PySpark
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, isnan

In [None]:
#Create a SparkSession

spark = SparkSession.builder \
    .appName("DataCleaning") \
    .getOrCreate()

In [None]:
#Load the Dataset

df = spark.read.csv("/content/Dataset .csv", header=True, inferSchema=True)
df.printSchema()
df.show(5)

root
 |-- Restaurant ID: string (nullable = true)
 |-- Restaurant Name: string (nullable = true)
 |-- Country Code: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Locality: string (nullable = true)
 |-- Locality Verbose: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Cuisines: string (nullable = true)
 |-- Average Cost for two: string (nullable = true)
 |-- Currency: string (nullable = true)
 |-- Has Table booking: string (nullable = true)
 |-- Has Online delivery: string (nullable = true)
 |-- Is delivering now: string (nullable = true)
 |-- Switch to order menu: string (nullable = true)
 |-- Price range: string (nullable = true)
 |-- Aggregate rating: string (nullable = true)
 |-- Rating color: string (nullable = true)
 |-- Rating text: string (nullable = true)
 |-- Votes: integer (nullable = true)

+-------------+--------------------+------------+----------------

In [None]:
# Count of missing/null values in each column
df.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in df.columns]).show()

+-------------+---------------+------------+----+-------+--------+----------------+---------+--------+--------+--------------------+--------+-----------------+-------------------+-----------------+--------------------+-----------+----------------+------------+-----------+-----+
|Restaurant ID|Restaurant Name|Country Code|City|Address|Locality|Locality Verbose|Longitude|Latitude|Cuisines|Average Cost for two|Currency|Has Table booking|Has Online delivery|Is delivering now|Switch to order menu|Price range|Aggregate rating|Rating color|Rating text|Votes|
+-------------+---------------+------------+----+-------+--------+----------------+---------+--------+--------+--------------------+--------+-----------------+-------------------+-----------------+--------------------+-----------+----------------+------------+-----------+-----+
|            0|              0|           0|   0|      0|       6|               6|        6|       6|      15|                   6|       6|                6|    

In [None]:
#Drop rows with any nulls
df_clean = df.dropna()

In [None]:
#Remove Duplicates
df_clean = df_clean.dropDuplicates()

In [None]:
#Save or Show the Cleaned Data
df_clean.show(10)  # View first 10 cleaned records

# Optionally save to new CSV
df_clean.coalesce(1).write.csv("cleaned_dataset.csv", header=True)

+-------------+-----------------+------------+---------+--------------------+--------------------+--------------------+-----------+-----------+--------------------+--------------------+------------------+-----------------+-------------------+-----------------+--------------------+-----------+----------------+------------+-----------+-----+
|Restaurant ID|  Restaurant Name|Country Code|     City|             Address|            Locality|    Locality Verbose|  Longitude|   Latitude|            Cuisines|Average Cost for two|          Currency|Has Table booking|Has Online delivery|Is delivering now|Switch to order menu|Price range|Aggregate rating|Rating color|Rating text|Votes|
+-------------+-----------------+------------+---------+--------------------+--------------------+--------------------+-----------+-----------+--------------------+--------------------+------------------+-----------------+-------------------+-----------------+--------------------+-----------+----------------+------

### TASK 2 : USE APACHE SPARK TO ANALYZE A LARGE DATASET, IMPLEMENTING OPERATIONS LIKE FILTERING, GROUPING, AND AGGREGATIONS.

### DELIVERABLE: A SPARK JOB SCRIPT WITH OUTPUT SHOWING ANALYSIS RESULTS.

In [None]:
#Setup and Import
from pyspark.sql.functions import col, count, avg, sum, max, min

In [None]:
#Create SparkSession
spark = SparkSession.builder \
    .appName("SparkDataAnalysis") \
    .getOrCreate()


In [None]:
# Load Dataset
df = spark.read.csv("Dataset .csv", header=True, inferSchema=True)
df.show(5)


+-------------+--------------------+------------+----------------+--------------------+--------------------+--------------------+----------+---------+--------------------+--------------------+----------------+-----------------+-------------------+-----------------+--------------------+-----------+----------------+------------+-----------+-----+
|Restaurant ID|     Restaurant Name|Country Code|            City|             Address|            Locality|    Locality Verbose| Longitude| Latitude|            Cuisines|Average Cost for two|        Currency|Has Table booking|Has Online delivery|Is delivering now|Switch to order menu|Price range|Aggregate rating|Rating color|Rating text|Votes|
+-------------+--------------------+------------+----------------+--------------------+--------------------+--------------------+----------+---------+--------------------+--------------------+----------------+-----------------+-------------------+-----------------+--------------------+-----------+--------

In [None]:
#Basic Filtering
filtered_df = df.filter(col("Votes") > 100)
filtered_df.show(5)

+-------------+--------------------+------------+----------------+--------------------+--------------------+--------------------+----------+---------+--------------------+--------------------+----------------+-----------------+-------------------+-----------------+--------------------+-----------+----------------+------------+-----------+-----+
|Restaurant ID|     Restaurant Name|Country Code|            City|             Address|            Locality|    Locality Verbose| Longitude| Latitude|            Cuisines|Average Cost for two|        Currency|Has Table booking|Has Online delivery|Is delivering now|Switch to order menu|Price range|Aggregate rating|Rating color|Rating text|Votes|
+-------------+--------------------+------------+----------------+--------------------+--------------------+--------------------+----------+---------+--------------------+--------------------+----------------+-----------------+-------------------+-----------------+--------------------+-----------+--------

In [None]:
# Grouping + Aggregation
# Example: Group by a categorical column (say "category") and count occurrences
grouped_df = df.groupBy("City").agg(count("*").alias("count"))
grouped_df.show()

+--------------------+-----+
|                City|count|
+--------------------+-----+
|           Bangalore|   20|
|           Tangerang|    2|
|               Kochi|   20|
|          Aurangabad|   20|
|           Faridabad|  251|
|            Armidale|    1|
|              Monroe|    1|
|            Savannah|   20|
|          New Delhi"|    1|
|              Mysore|   20|
|Huda City Centre ...|    1|
|           Bras�_lia|   20|
|            Valdosta|   20|
|           Edinburgh|   20|
|           Singapore|   20|
|       San Juan City|    2|
|          Manchester|   20|
|             Jakarta|   16|
|          Beechworth|    1|
|               Patna|   20|
+--------------------+-----+
only showing top 20 rows



In [None]:
# Multiple aggregations
agg_df = df.groupBy("City").agg(
    avg("Votes").alias("avg_votes"),
    sum("Votes").alias("total_votes"),
    max("Votes").alias("max_votes")
)
agg_df.show()

+--------------------+------------------+-----------+---------+
|                City|         avg_votes|total_votes|max_votes|
+--------------------+------------------+-----------+---------+
|           Bangalore|           2805.75|      56115|    10934|
|           Tangerang|            1183.5|       2367|     2212|
|               Kochi|            359.95|       7199|      722|
|          Aurangabad|              64.8|       1296|      240|
|           Faridabad|25.840637450199203|       6486|      799|
|            Armidale|              25.0|         25|       25|
|              Monroe|              65.0|         65|       65|
|            Savannah|            768.85|      15377|     1803|
|          New Delhi"|              NULL|       NULL|     NULL|
|              Mysore|             212.9|       4258|      393|
|Huda City Centre ...|              NULL|       NULL|     NULL|
|           Bras�_lia|             10.85|        217|       30|
|            Valdosta|             210.0

In [None]:
agg_df.coalesce(1).write.csv("aggregated_results.csv", header=True)
