In [179]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col ,isnan,expr,round , when, max
from datetime import datetime
import psycopg2
import boto3



In [3]:
spark = SparkSession.builder.appName("FileProcessing").getOrCreate()

In [4]:
folder_path = "D:\Custom_assignment\Data"

In [137]:
df = spark.read.format("csv").option("header","true").option("inferSchema","true").load(folder_path)
df.show()

+------+---------+--------+---------+-------+
|Region|  Product|    Date|UnitsSold|Revenue|
+------+---------+--------+---------+-------+
| North|Product A|1/1/2022|      100|   null|
| North|Product B|1/1/2022|      150|   2000|
| South|Product A|1/1/2022|      120|   1500|
| South|Product B|    NULL|       80|   1000|
|  West|Product A|1/1/2022|       90|   1200|
|  West|Product B|1/1/2022|      200|   2500|
| North|Product A|1/2/2022|      110|   1200|
| North|Product B|1/2/2022|      140|   1800|
| South|Product A|1/2/2022|      130|   1600|
| South|Product B|1/2/2022|       70|    900|
|  West|Product A|1/2/2022|      100|   1300|
|  West|Product B|1/2/2022|      190|   2400|
| North|     NULL|    NULL|      100|   NULL|
+------+---------+--------+---------+-------+



In [138]:
df.describe().show()

+-------+------+---------+--------+------------------+------------------+
|summary|Region|  Product|    Date|         UnitsSold|           Revenue|
+-------+------+---------+--------+------------------+------------------+
|  count|    13|       13|      13|                13|                12|
|   mean|  null|     null|    null|121.53846153846153|1581.8181818181818|
| stddev|  null|     null|    null|  39.7588887013864|   540.03366898404|
|    min| North|     NULL|1/1/2022|                70|              1000|
|    max|  West|Product B|    NULL|               200|              NULL|
+-------+------+---------+--------+------------------+------------------+



In [139]:
df.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- UnitsSold: integer (nullable = true)
 |-- Revenue: string (nullable = true)



In [140]:
df = df.filter( (col('Product').isNotNull()) & (col('UnitsSold').isNotNull()) & (col('Revenue').isNotNull()) & (col('Product') != 'NULL') & (col('Date') != 'NULL') & (col('Revenue') != 'NULL'))

In [141]:
df.show()

+------+---------+--------+---------+-------+
|Region|  Product|    Date|UnitsSold|Revenue|
+------+---------+--------+---------+-------+
| North|Product B|1/1/2022|      150|   2000|
| South|Product A|1/1/2022|      120|   1500|
|  West|Product A|1/1/2022|       90|   1200|
|  West|Product B|1/1/2022|      200|   2500|
| North|Product A|1/2/2022|      110|   1200|
| North|Product B|1/2/2022|      140|   1800|
| South|Product A|1/2/2022|      130|   1600|
| South|Product B|1/2/2022|       70|    900|
|  West|Product A|1/2/2022|      100|   1300|
|  West|Product B|1/2/2022|      190|   2400|
+------+---------+--------+---------+-------+



In [142]:
df = df.withColumn('unit_price', round(expr('revenue/unitsSold'),2))

In [143]:
df.show()

+------+---------+--------+---------+-------+----------+
|Region|  Product|    Date|UnitsSold|Revenue|unit_price|
+------+---------+--------+---------+-------+----------+
| North|Product B|1/1/2022|      150|   2000|     13.33|
| South|Product A|1/1/2022|      120|   1500|      12.5|
|  West|Product A|1/1/2022|       90|   1200|     13.33|
|  West|Product B|1/1/2022|      200|   2500|      12.5|
| North|Product A|1/2/2022|      110|   1200|     10.91|
| North|Product B|1/2/2022|      140|   1800|     12.86|
| South|Product A|1/2/2022|      130|   1600|     12.31|
| South|Product B|1/2/2022|       70|    900|     12.86|
|  West|Product A|1/2/2022|      100|   1300|      13.0|
|  West|Product B|1/2/2022|      190|   2400|     12.63|
+------+---------+--------+---------+-------+----------+



In [144]:
df = df.withColumn('cost_price', when(col('product') == 'Product A', 11)
                                .when(col('product') == 'Product B', 11.2)
                                .otherwise(10))

In [145]:
df.show()

+------+---------+--------+---------+-------+----------+----------+
|Region|  Product|    Date|UnitsSold|Revenue|unit_price|cost_price|
+------+---------+--------+---------+-------+----------+----------+
| North|Product B|1/1/2022|      150|   2000|     13.33|      11.2|
| South|Product A|1/1/2022|      120|   1500|      12.5|      11.0|
|  West|Product A|1/1/2022|       90|   1200|     13.33|      11.0|
|  West|Product B|1/1/2022|      200|   2500|      12.5|      11.2|
| North|Product A|1/2/2022|      110|   1200|     10.91|      11.0|
| North|Product B|1/2/2022|      140|   1800|     12.86|      11.2|
| South|Product A|1/2/2022|      130|   1600|     12.31|      11.0|
| South|Product B|1/2/2022|       70|    900|     12.86|      11.2|
|  West|Product A|1/2/2022|      100|   1300|      13.0|      11.0|
|  West|Product B|1/2/2022|      190|   2400|     12.63|      11.2|
+------+---------+--------+---------+-------+----------+----------+



In [146]:
df = df.withColumn('profit_percent', round(((col('unit_price') - col('cost_price'))*100 / col('unit_price')), 2).cast('double'))

In [147]:
df.show()

+------+---------+--------+---------+-------+----------+----------+--------------+
|Region|  Product|    Date|UnitsSold|Revenue|unit_price|cost_price|profit_percent|
+------+---------+--------+---------+-------+----------+----------+--------------+
| North|Product B|1/1/2022|      150|   2000|     13.33|      11.2|         15.98|
| South|Product A|1/1/2022|      120|   1500|      12.5|      11.0|          12.0|
|  West|Product A|1/1/2022|       90|   1200|     13.33|      11.0|         17.48|
|  West|Product B|1/1/2022|      200|   2500|      12.5|      11.2|          10.4|
| North|Product A|1/2/2022|      110|   1200|     10.91|      11.0|         -0.82|
| North|Product B|1/2/2022|      140|   1800|     12.86|      11.2|         12.91|
| South|Product A|1/2/2022|      130|   1600|     12.31|      11.0|         10.64|
| South|Product B|1/2/2022|       70|    900|     12.86|      11.2|         12.91|
|  West|Product A|1/2/2022|      100|   1300|      13.0|      11.0|         15.38|
|  W

In [148]:
df.createOrReplaceTempView("sales_data")

In [149]:
avg_product_price = spark.sql("""
    SELECT Region, Product, AVG(unit_price) AS avg_product_price
    FROM sales_data
    GROUP BY Region, Product
""")

In [150]:
avg_product_price.show()

+------+---------+------------------+
|Region|  Product| avg_product_price|
+------+---------+------------------+
| South|Product B|             12.86|
|  West|Product B|12.565000000000001|
| North|Product B|13.094999999999999|
|  West|Product A|            13.165|
| South|Product A|12.405000000000001|
| North|Product A|             10.91|
+------+---------+------------------+



In [151]:
max_unit_price = spark.sql("""
    SELECT Region, MAX(unit_price) AS max_unit_price
    FROM sales_data
    GROUP BY Region
""")

In [152]:
max_unit_price.show()

+------+--------------+
|Region|max_unit_price|
+------+--------------+
| South|         12.86|
|  West|         13.33|
| North|         13.33|
+------+--------------+



In [153]:
max_profit_per_product = spark.sql("""
    SELECT Region, Product, MAX(profit_percent) AS max_profit
    FROM sales_data
    GROUP BY Region, Product
""")

max_profit_per_product.show()


+------+---------+----------+
|Region|  Product|max_profit|
+------+---------+----------+
| South|Product B|     12.91|
|  West|Product B|     11.32|
| North|Product B|     15.98|
|  West|Product A|     17.48|
| South|Product A|      12.0|
| North|Product A|     -0.82|
+------+---------+----------+



In [154]:
df.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- UnitsSold: integer (nullable = true)
 |-- Revenue: string (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- cost_price: double (nullable = false)
 |-- profit_percent: double (nullable = true)



In [168]:
db_host = 'salesdb.cluster-cwtacdgkluou.us-east-1.rds.amazonaws.com'
db_name = 'abc_sales'
db_user = 'postgres'
db_password = 'postgres'
db_port = '5432'
    
conn = psycopg2.connect(
    host=db_host,
    dbname=db_name,
    user=db_user,
    password=db_password,
    port=db_port
)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS public.abc_sales
(
id bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY ( INCREMENT 1 START 1 MINVALUE 1 MAXVALUE 99999999999 CACHE 1 ),
region character varying(20) COLLATE pg_catalog."default",
product character varying(20) COLLATE pg_catalog."default",
date timestamp without time zone,
unitsold bigint,
revenue character varying(50) COLLATE pg_catalog."default",
unit_price double precision,
cost_price double precision,
profit_percent double precision,
CONSTRAINT abc_sales_pkey PRIMARY KEY (id)
    )''')
conn.commit()




In [177]:
df_values = df.collect()
    
for row in df_values:
    cursor.execute('''INSERT INTO public.abc_sales (region, product, date, unitsold, revenue, unit_price, cost_price, profit_percent) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)''',     
    (row['Region'],
    row['Product'],
    row['Date'],
    row['UnitsSold'],
    row['Revenue'],
    row['unit_price'],
    row['cost_price'],
    row['profit_percent']))
    conn.commit()
conn.close()

In [180]:
s3_client = boto3.client('s3')

In [188]:
df.write.csv('D:/Custom_assignment/Data/outputFile', header=True,mode='append')

In [191]:
s3 = boto3.client('s3')
bucket_name = 'savecsvintoprasads3bucket'
s3_folder_path = 'ABC-Sales/'
local_file_path = 'D:/Custom_assignment/Data/outputFile/*.csv'
s3_file_path = s3_folder_path + 'transformed_sales.csv'

s3.upload_file(local_file_path, bucket_name, s3_file_path)

OSError: [WinError 123] The filename, directory name, or volume label syntax is incorrect: 'D:/Custom_assignment/Data/outputFile/*.csv'