
# iphone Sales Analysis


#### Milestone 1

In [0]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName("iphone_sales_analysis").getOrCreate()

In [0]:
# reading product_data csv

product_filepath_csv = "dbfs:/FileStore/shared_uploads/snl.adh97@gmail.com/product_data-1.csv"

# reading sales_data with delimiter '|'

sales_filepath = "dbfs:/FileStore/shared_uploads/snl.adh97@gmail.com/sales_data_pipe_delimited-1.csv"

# Sales data with 10,000 rows and Product data was created using chatGPT so that I can work with larger data.


### API for Sales Data Collector

##### It reads a sales data that has │ has delimiter and has a header, and returns a partitioned hive-table in Parquet format.

In [0]:
# Sales Data API

def sales_data_api(spark, text_file_path):
    
    # defining schema in order to keep control over data types
    sales_schema = StructType([
        StructField("seller_id", IntegerType(), True),
        StructField("product_id", IntegerType(), True),
        StructField("buyer_id", IntegerType(), True),
        StructField("sale_date", StringType(), True),  # Read as string initially
        StructField("quantity", IntegerType(), True),
        StructField("price", IntegerType(), True)])

    # reading sales data in csv format
    sales = spark.read.format("csv").option("delimiter","|").schema(sales_schema).option("header",True).load(sales_filepath)

    # converting sale_date column to datetype from stringtype
    sales = sales.withColumn("sale_date", col("sale_date").cast(DateType()))

    # creating a hive table with partitioned sale_date
    hive_table_name = "partitioned_sales_date"

    sales.write.mode("overwrite").partitionBy("sale_date").format("parquet").saveAsTable(hive_table_name)
    print(f"Hive Table with name '{hive_table_name}' created sucessfully.")

    return hive_table_name


### Product Data Collector API

#### It reads product data from a Parquet file and writes it into a non-partitioned Hive table.

In [0]:
def product_data_api(spark, file_path):

    # defining schema in order to keep control over the data
    product_schema = StructType ([
        StructField("product_id", IntegerType(), True),
        StructField("product_name", StringType(), True),
        StructField("unit_price", IntegerType(), True)
    ])

    # reading product data in csv format
    product = spark.read.format("csv").option("header",True).schema(product_schema).load(file_path)

    # defining the Hive table name
    hive_table_name = "products_table"

    # writing the table in non-partitioned parquet format Hive Table
    product.write.mode("overwrite").format("parquet").saveAsTable(hive_table_name)

    print(f"Hive table with name '{hive_table_name}' created successfully.")
    return hive_table_name


#### Milestone 2

##### Partitioned Hive Table in Parquet Format for Sales_data

In [0]:
# Testing Sales_data_collector_api

sales_data_api(spark,sales_filepath)

Hive Table with name 'partitioned_sale_date' created sucessfully.
Out[7]: 'partitioned_sale_date'

In [0]:
# displaying the table

spark.table("partitioned_sale_date").display()

seller_id,product_id,buyer_id,quantity,price,sale_date
2,1,163,2,2000,2023-05-29
1,1,198,1,1000,2023-05-29
5,1,282,1,1000,2023-05-29
3,1,309,3,3000,2023-05-29
4,1,318,3,3000,2023-05-29
4,1,321,1,1000,2023-05-29
3,1,337,2,2000,2023-05-29
5,1,340,2,2000,2023-05-29
4,1,343,1,1000,2023-05-29
3,3,642,1,1400,2023-05-29



##### Non-partitioned Hive Table for Product_Data

In [0]:
# Testing product_data_api

product_data_api(spark, product_filepath_csv)

Hive table with name 'products_table' created successfully.
Out[12]: 'products_table'

In [0]:
# displaying the table

spark.table("products_table").display()

product_id,product_name,unit_price
1,S8,1000
2,G4,800
3,iPhone,1400
