In [28]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

spark = SparkSession.builder\
                    .appName("Spark project")\
                    .config('spark.driver.extraClassPath','/usr/lib/jvm/java-17-openjdk-amd64/lib/postgresql-42.6.0.jar')\
                    .getOrCreate()

Importing all the cleaned dataframes from postgresql

In [29]:
table_names = ["customer_df","seller_df","geolocation_df","order_item_df","order_payment_df","orders_df","product_df","reviews_df"]
table_dataframes = {}

jdbc_url = "jdbc:postgresql://localhost:5432/postgres"
connection_properties = {
    "user": "nikita",
    "password": "1234",
    "driver": "org.postgresql.Driver"
}

for table_name in table_names:
    df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
    
    table_dataframes[table_name] = df

extracting dataframes from the dictionary

In [30]:
customer_df = table_dataframes["customer_df"]
seller_df = table_dataframes["seller_df"]
geolocation_df = table_dataframes["geolocation_df"]
order_item_df = table_dataframes["order_item_df"]
order_payment_df = table_dataframes["order_payment_df"]
orders_df = table_dataframes["orders_df"]
product_df = table_dataframes["product_df"]
reviews_df = table_dataframes["reviews_df"]

### Metrics 1 
Finding the distance between the customer and seller for each order.
Finding the average distance that the sellers from each state have sold the products i.e. the average radius of distance covered by sellers of each state

In [31]:
# Creating two same tables but with different name to join later for customer and seller location.
geoloacation_df_cust = geolocation_df.selectExpr('geolocation_zip_code_prefix as geolocation_zip_code_prefix_cust', 'geolocation_lat as geolocation_lat_cust', 'geolocation_lng as geolocation_lng_cust')
geoloacation_df_sell = geolocation_df.selectExpr('geolocation_zip_code_prefix as geolocation_zip_code_prefix_sell', 'geolocation_lat as geolocation_lat_sell', 'geolocation_lng as geolocation_lng_sell')
jointype = 'inner'

In [32]:
geoloacation_df_cust_broadcast = f.broadcast(geoloacation_df_cust)
geoloacation_df_sell_broadcast = f.broadcast(geoloacation_df_sell)

single_joined_table = (
    customer_df.alias('c')
    .join(geoloacation_df_cust_broadcast.alias('gc'), f.col('c.customer_zip_code_prefix') == f.col('gc.geolocation_zip_code_prefix_cust'), jointype)
    .select(
        "c.customer_id",
        "c.customer_zip_code_prefix",
        "gc.geolocation_lat_cust",
        "gc.geolocation_lng_cust"
    )
    .join(orders_df.alias('o'), f.col('c.customer_id') == f.col('o.customer_id'), jointype)
    .join(order_item_df.alias('oi'), f.col('o.order_id') == f.col('oi.order_id'), jointype)
    .join(seller_df.alias('s'), f.col('oi.seller_id') == f.col('s.seller_id'), jointype)
    .join(geoloacation_df_sell_broadcast.alias('gs'), f.col('s.seller_zip_code_prefix') == f.col('gs.geolocation_zip_code_prefix_sell'), jointype)
    .select(
        "o.order_id",
        "oi.product_id",
        "c.customer_id",
        "c.customer_zip_code_prefix",
        "gc.geolocation_lat_cust",
        "gc.geolocation_lng_cust",
        "oi.seller_id",
        "s.seller_zip_code_prefix",
        "s.seller_state",
        "gs.geolocation_lat_sell",
        "gs.geolocation_lng_sell"
    )
)



In [34]:
single_joined_table = single_joined_table.dropDuplicates()

In [37]:
# Convert latitude and longitude from degrees to radians
single_joined_table = single_joined_table.withColumn("geolocation_lat_cust_rad", f.radians(single_joined_table["geolocation_lat_cust"]).cast("double"))
single_joined_table = single_joined_table.withColumn("geolocation_lng_cust_rad", f.radians(single_joined_table["geolocation_lng_cust"]).cast("double"))
single_joined_table = single_joined_table.withColumn("geolocation_lat_sell_rad", f.radians(single_joined_table["geolocation_lat_sell"]).cast("double"))
single_joined_table = single_joined_table.withColumn("geolocation_lng_sell_rad", f.radians(single_joined_table["geolocation_lng_sell"]).cast("double"))

# Calculate the distance using the spherical law of cosines
single_joined_table = single_joined_table.withColumn(
    "distance_km",
    f.acos(
        f.sin("geolocation_lat_cust_rad") * f.sin("geolocation_lat_sell_rad") +
        f.cos("geolocation_lat_cust_rad") * f.cos("geolocation_lat_sell_rad") *
        f.cos(f.col("geolocation_lng_sell_rad") - f.col("geolocation_lng_cust_rad"))
    ).cast("double") * 6371.0  # Radius of the Earth in kilometers
)

# Drop the intermediate columns
single_joined_table = single_joined_table.drop(
    "geolocation_lat_cust_rad",
    "geolocation_lng_cust_rad",
    "geolocation_lat_sell_rad",
    "geolocation_lng_sell_rad"
)

# Show the DataFrame with the distance column
distance_output = single_joined_table.select("order_id","product_id","customer_id","seller_id","distance_km")
distance_output.show()




+--------------------+--------------------+--------------------+--------------------+------------------+
|            order_id|          product_id|         customer_id|           seller_id|       distance_km|
+--------------------+--------------------+--------------------+--------------------+------------------+
|00018f77f2f0320c5...|e5f2d52b802189ee6...|f6dd3ec061db4e398...|dd7ddc04e1b6c2c61...| 585.5639365873883|
|00042b26cf59d7ce6...|ac6c3623068f30de0...|58dbd0b2d70206bf4...|df560393f3a51e745...|  646.163462432375|
|00054e8431b9d7675...|8d4f2bb7e93e6710a...|32e2e6ab09e778d99...|7040e82f899a04d1b...|484.86016437388383|
|0006ec9db01a64e59...|99a4788cb24856965...|5d178120c29c61748...|4a3ca9315b744ce9f...| 566.2993727151662|
|000aed2e25dbad2f9...|4fa33915031a8cde0...|fff5169e583fd07fa...|fe2032dab1a61af87...| 37.93670543869258|
|0015ebb40fb17286b...|50fd2b788dc166edd...|da43a556bf5c36a11...|8b321bb669392f516...| 779.5632051061305|
|001c85b5f68d2be0c...|84f45695836516442...|48ed31e735f1

                                                                                

In [39]:
sp_df = single_joined_table.filter((single_joined_table["seller_state"] == "SP") & single_joined_table["distance_km"].isNotNull() & (single_joined_table["distance_km"] > 0))
distance_sp = sp_df.withColumn('distance_km', f.col('distance_km').cast('int'))

state_analysis1 = distance_sp.agg(
    f.avg("distance_km").alias("avg_distance_km")
)

avg_distance_km = state_analysis1.collect()[0]["avg_distance_km"]
# print(avg_distance_km)

                                                                                

In [40]:
average_distance_by_state = single_joined_table.groupBy("seller_state")

state_analysis = average_distance_by_state.agg(
    f.avg("distance_km").alias("avg_distance_km"),
    f.count("product_id").alias("product_count")
)

state_analysis = state_analysis.na.fill(avg_distance_km, subset=["avg_distance_km"])

state_analysis.show()




+------------+------------------+-------------+
|seller_state|   avg_distance_km|product_count|
+------------+------------------+-------------+
|          SC| 753.7405041684843|         3740|
|          RO|2346.7543054473085|           14|
|          AM| 2327.352004471721|            3|
|          GO| 853.2263712439288|          479|
|          MT|1362.8805506454491|          140|
|          SP| 542.2997957662903|        72474|
|          ES| 813.4300862053252|          322|
|          PB|1467.8169404798093|           37|
|          RS| 987.2585178058513|         2011|
|          MS|1025.6576592571016|           48|
|          MG| 599.5672389875979|         8058|
|          BA|1121.6669708294878|          566|
|          PE|1582.9759712125626|          408|
|          CE|1655.7044046332737|           89|
|          RN| 946.9384882129613|           54|
|          RJ| 582.9235003097515|         4396|
|          MA|1939.3039258196295|          395|
|          DF| 852.2064586337052|       

                                                                                

In [41]:
state_analysis.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/postgres',driver = 'org.postgresql.Driver', dbtable = 'seller_average_distance', user='nikita',password='1234').mode('overwrite').save()

### Metrics 2

In [42]:

joined_df = order_payment_df.join(single_joined_table, "order_id", "inner")\
                            .join(product_df.alias('p'),single_joined_table['product_id'] == product_df['product_id'])
joined_df = joined_df.withColumn("payment_value", f.coalesce(joined_df["payment_value"], f.lit(0)))

# joined_df.show()

revenue_by_product_state = (
    joined_df
    .groupBy("seller_state", "product_category_name")
    .agg(f.sum("payment_value").alias("total_revenue"))
)

pivot_table = (
    revenue_by_product_state
    .groupBy("product_category_name")
    .pivot("seller_state")
    .agg(f.first("total_revenue"))  # You can choose any aggregation method that makes sense for your data
)

# Show the pivot table
pivot_table1 = pivot_table.fillna(0)
pivot_table1.show()





+---------------------+---+---+------------------+-------+------------------+------------------+-----------------+---+------------------+-----------------+-------+---+------------------+------------------+------+------------------+------------------+------+-------+------------------+------------------+-----------------+------------------+
|product_category_name| AC| AM|                BA|     CE|                DF|                ES|               GO| MA|                MG|               MS|     MT| PA|                PB|                PE|    PI|                PR|                RJ|    RN|     RO|                RS|                SC|               SE|                SP|
+---------------------+---+---+------------------+-------+------------------+------------------+-----------------+---+------------------+-----------------+-------+---+------------------+------------------+------+------------------+------------------+------+-------+------------------+------------------+---------------

                                                                                

In [43]:
pivot_table1.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/postgres',driver = 'org.postgresql.Driver', dbtable = 'Question_2_pivot_1', user='nikita',password='1234').mode('overwrite').save()

In [44]:
from pyspark.sql.window import Window

# Calculate total revenue by state and product category
revenue_by_product_state = (
    joined_df
    .groupBy("seller_state", "product_category_name")
    .agg(f.sum("payment_value").alias("total_revenue"))
)

# Create a window specification to partition by product category
window_spec = Window.partitionBy("seller_state")

# Calculate the total revenue for each product category within each state
revenue_by_product_state = revenue_by_product_state.withColumn(
    "total_revenue_category_state",
    f.sum("total_revenue").over(window_spec)
)

# Calculate the percentage of revenue for each product category in each state
revenue_by_product_state = revenue_by_product_state.withColumn(
    "percentage_of_revenue",
    (revenue_by_product_state["total_revenue"] / revenue_by_product_state["total_revenue_category_state"]) * 100
)

# Pivot the table to get state-wise percentages of revenue for each product
pivot_table = (
    revenue_by_product_state
    .groupBy("seller_state")
    .pivot("product_category_name")
    .agg(f.first("percentage_of_revenue"))
)

# Fill NaN values with 0
pivot_table2 = pivot_table.fillna(0)

# Show the pivot table
pivot_table2.show()


+------------+--------------------------+-------------------+--------------------+---------------------+--------------------+------------------+------------------+------------------+----------------------+-------------------+--------------------+-----------------+-------------------+--------------------+------------------+---------------------+-------------------+-------------------------------+-------------------------+-------------------------+-------------------+------------------------+-----------------------+-------------------+--------------------+-------------------+--------------------+----------------------+------------------------+-------------------------+---------------------+-------------------+--------------------+-----------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------------------+------------------+-------------------+-----------------

In [45]:
pivot_table2.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/postgres',driver = 'org.postgresql.Driver', dbtable = 'Question_2_pivot_2', user='nikita',password='1234').mode('overwrite').save()

In [46]:
spark.stop()