<a href="https://colab.research.google.com/github/shaguftah10sep/assignments/blob/main/python_retail_code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=50536d1d88a593ffe438abd0aa8f81427d76b9e05a26968446f27963cee15639
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [4]:
from google.colab import files
uploaded = files.upload()


Saving transactions.parquet to transactions (1).parquet
Saving umd_vip_to_profile_mapping.csv to umd_vip_to_profile_mapping (1).csv
Saving vips_2020-11-01.csv to vips_2020-11-01 (1).csv
Saving vips_2020-11-15.csv to vips_2020-11-15 (1).csv
Saving vips_2020-11-25.csv to vips_2020-11-25 (1).csv


In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum as spark_sum

# Create a Spark session
spark = SparkSession.builder \
    .appName("VIP Sales Overview") \
    .getOrCreate()

# Load the CSV files
vip_20201101 = spark.read.csv(r'vips_2020-11-01.csv', sep=',', header=True, inferSchema=True)
vip_20201115 = spark.read.csv(r'vips_2020-11-15.csv', sep=',', header=True, inferSchema=True)
vip_20201125 = spark.read.csv(r'vips_2020-11-25.csv', sep=',', header=True, inferSchema=True)

# Load VIP to profile mapping
vip_profile_mapping = spark.read.csv(r'umd_vip_to_profile_mapping.csv', sep=';', header=True, inferSchema=True)

# Print schemas to verify column names
print("VIP 2020-11-01 Schema:")
vip_20201101.printSchema()

print("VIP Profile Mapping Schema:")
vip_profile_mapping.printSchema()


# Combine VIP snapshots
vips = vip_20201101.union(vip_20201115).union(vip_20201125)

# Check the schema of the combined DataFrame
print("Combined VIP Schema:")
vips.printSchema()

#  Filter VIPs from the Netherlands
vips_nl = vips.filter(col('country') == 'The Netherlands')

# Merge VIP data with profile mapping
# Ensure the column names match
vip_data = vips_nl.join(vip_profile_mapping, on='vip_id', how='inner')

# Filter active VIPs
vip_data_active = vip_data.filter(col('active') == 'yes')

# Load transaction data (parquet)
transactions = spark.read.parquet('transactions.parquet')

# Filter valid transactions
valid_transactions = transactions.filter(col('cancellation_flag').isNull())

# Calculate total sales
valid_transactions = valid_transactions.withColumn(
    'total_sales',
    (col('quantity') * col('recommended_retail_price_per_unit')) - col('discount_amount_per_unit')
)

#  Group by profile_id and calculate total sales per VIP
total_sales_per_vip = valid_transactions.groupBy('profile_id').agg(spark_sum('total_sales').alias('total_sales'))

#  Merge VIP data with total sales
vip_sales = vip_data_active.join(total_sales_per_vip, on='profile_id', how='left').fillna(0)

# Select relevant columns
output = vip_sales.select('first_name', 'last_name', 'email', 'total_sales')

# Sort by total sales in descending order
output = output.orderBy(col('total_sales').desc())

# Show the output
output.show()


VIP 2020-11-01 Schema:
root
 |-- vip_id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- vip_type: string (nullable = true)
 |-- country: string (nullable = true)
 |-- email: string (nullable = true)

VIP Profile Mapping Schema:
root
 |-- vip_id: long (nullable = true)
 |-- profile_id: string (nullable = true)
 |-- active: string (nullable = true)
 |-- meta_change_date: string (nullable = true)
 |-- meta_comments: string (nullable = true)

Combined VIP Schema:
root
 |-- vip_id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- vip_type: string (nullable = true)
 |-- country: string (nullable = true)
 |-- email: string (nullable = true)

+----------+-------------+--------------------+-----------+
|first_name|    last_name|               email|total_sales|
+----------+-------------+--------------------+-----------+
|   Kristin|       Brooks|  Kristin@Brooks.com|    