In [57]:
import pandas as pd
from pyspark.sql import SparkSession
from io import StringIO
from io import BytesIO

In [58]:
# May take awhile locally
spark = SparkSession.builder \
        .appName("pipeline") \
        .config("spark.jars.packages", "com.crealytics:spark-excel_2.11:0.12.2") \
        .getOrCreate()

In [59]:
access_id = "******"
access_key = "*******"
region='******'
s3_bucket = '*****'
prefix_path = '****'

In [60]:
import boto3
import pandas as pd
import io


s3_resource = boto3.resource(
    service_name='s3',
    region_name=region,
    aws_access_key_id=access_id,
    aws_secret_access_key=access_key
);


s3_client = boto3.client(service_name='s3',
        region_name=region,
        aws_access_key_id=access_id,
        aws_secret_access_key=access_key
    )

!pip install boto3

In [61]:
import boto3
import pandas as pd
from io import StringIO

def read_s3_files(client, bucket, path):
    obj = client.get_object(Bucket=s3_bucket, Key=prefix_path)
    df = pd.read_csv(obj['Body'])
    return df

def list_s3_files(client, bucket, path):
    """List files in specific S3 URL"""
    response = client.list_objects(Bucket=bucket, Prefix=path)
    for content in response.get('Contents', []):
        yield content.get('Key')
    file_list = ListFiles(client)
    files=[]
    for file in file_list:
        files.append(file)
    return files

def write_dataframe_to_s3(client, dataframe, bucket, filepath, outputfile_format):
    s3_resource = client
    if outputfile_format == 'parquet':
        print("Writing {} records to S3 File: {}".format(len(dataframe), filepath))
        out_buffer = BytesIO()
        # table = pa.Table.from_pandas(dataframe)
        # pq.write_table(table, fileName)
        dataframe.to_parquet(out_buffer, index=False)
        print('came inside of  parquet format')

    elif outputfile_format == 'csv':
        out_buffer = StringIO()
        dataframe.to_csv(out_buffer, sep="|",  index=False)
        print('came inside of  csv format')
    s3_resource.Object(bucket, filepath).put(Body=out_buffer.getvalue())

In [62]:
files = []

my_bucket = s3_resource.Bucket(s3_bucket)
for object_summary in my_bucket.objects.filter(Prefix="gluedemo"):
    if '.csv'in object_summary.key:
        files.append(str(object_summary.key))
print(files)
for file in files:
    print(file)
    obj = s3_client.get_object(Bucket=s3_bucket, Key=str(file))
    if 'dimdate' in file:
        calendar_df = pd.read_csv(obj['Body'])
    if 'dimproduct' in file:
        product_df = pd.read_csv(obj['Body'])
    if 'dimstore' in file:
        store_df = pd.read_csv(obj['Body'])
    if 'factsales' in file:
        sales_df = pd.read_csv(obj['Body'])

['gluedemo/dimdate/calendar.csv', 'gluedemo/dimproduct/product.csv', 'gluedemo/dimstore/store.csv', 'gluedemo/factsales/sales.csv']
gluedemo/dimdate/calendar.csv
gluedemo/dimproduct/product.csv
gluedemo/dimstore/store.csv
gluedemo/factsales/sales.csv


In [63]:
product_df.head(2)

Unnamed: 0,productid,division,gender,category
0,567228914507,APPAREL,KIDS,CRICKET
1,565177969035,FOOTWEAR,MENS,COLLECTIONS


In [64]:
store_df.head(2)

Unnamed: 0,storeid,channel,country
0,409,Digital,INDIA
1,410,Digital,CHINA


In [65]:
sales_df.head(2)

Unnamed: 0,saleId,netSales,salesUnits,storeId,dateId,productId
0,1,300.24,5,409,4965,567228914507
1,2,300.24,5,409,4965,567228914507


In [66]:
calendar_df.head(2)

Unnamed: 0,datekey,datecalendarday,datecalendaryear,weeknumberofseason
0,4965,1,2018,2
1,4966,2,2018,2


## Convert to Spark Dataframe

In [67]:
calendar_sdf = spark.createDataFrame(calendar_df)

In [68]:
product_sdf = spark.createDataFrame(product_df)

In [69]:
store_sdf = spark.createDataFrame(store_df)

In [70]:
sales_sdf = spark.createDataFrame(sales_df)

In [71]:
product_sdf.createOrReplaceTempView("view_product")
store_sdf.createOrReplaceTempView("view_store")
calendar_sdf.createOrReplaceTempView("view_date")
sales_sdf.createOrReplaceTempView("view_sales")

In [72]:
finalsql = """Select concat(CAST(dd.datecalendaryear as varchar(20)),'_', 
            CAST(ds.channel as varchar(20)), '_', CAST(dp.division as varchar(20)), '_', CAST(dp.gender as varchar(20))
            , '_', CAST(dp.category as varchar(20))) as uniquekey, fs.dateid, fs.storeid, fs.productid, fs.saleid, fs.netsales, fs.salesunits ,
                dd.datecalendaryear, dd.weeknumberofseason,
                ds.channel, ds.country,
                dp.division, dp.gender, dp.category
                from view_sales fs 
                JOIN view_date dd on dd.datekey = fs.dateid
                JOIN view_store ds on ds.storeid = fs.storeid
                JOIN view_product dp on dp.productid = fs.productid"""

In [73]:
salesDF = spark.sql(finalsql)

In [74]:
salesDF.createOrReplaceTempView("view_salesfinal")

In [75]:
aggregate = """Select uniquekey, channel, country, division, category, datecalendaryear, weeknumberofseason,
               SUM(netsales) netsales, SUM(salesunits) salesunits
               FROM view_salesfinal
               GROUP BY uniquekey, channel, country, division, category, datecalendaryear, weeknumberofseason"""


In [76]:
final_df = salesDF = spark.sql(aggregate)

In [77]:
final_df.show(3)

+--------------------+-------+------------+--------+-----------+----------------+------------------+--------+----------+
|           uniquekey|channel|     country|division|   category|datecalendaryear|weeknumberofseason|netsales|salesunits|
+--------------------+-------+------------+--------+-----------+----------------+------------------+--------+----------+
|2018_Digital_APPA...|Digital|LUXEMBOURG  | APPAREL|    CRICKET|            2018|                 2|  1501.2|        28|
|2018_Digital_FOOT...|Digital|       INDIA|FOOTWEAR|COLLECTIONS|            2018|                 2| 2266.68|        37|
|2018_Digital_APPA...|Digital|LUXEMBOURG  | APPAREL|COLLECTIONS|            2018|                 2| 1150.48|        13|
+--------------------+-------+------------+--------+-----------+----------------+------------------+--------+----------+
only showing top 3 rows



In [78]:
final_pdf = final_df.toPandas()

In [80]:
write_dataframe_to_s3(s3_resource, final_pdf, s3_bucket, 'gluedemo/reportsales/final.parquet', 'parquet')

Writing 8 records to S3 File: gluedemo/reportsales/final.parquet
came inside of  parquet format
