In [82]:
import pandas as pd
import requests
import json
import psycopg2
import psycopg2.extras as extras
import matplotlib.pyplot as plt
class DataPipeline:
    def __init__(self,sales_data_path,**args):
        self.sales_data_path=sales_data_path
        self.user_details_api = args.get("user_details_api", 'https://jsonplaceholder.typicode.com/users')
        self.weather_api = args.get("weather_api", 'https://api.weatherbit.io/v2.0/history/daily')
        self.weather_api_key = args.get("weather_api_key", '723ebf6e3dac4b9bad7f2f11e03bc367')
        # default lat & lon pointing to walmart store
        self.lat, self.lon = args.get("lat", 36.358), args.get("lon", -94.209)
    def read_data(self):
        self.sales_df=pd.read_csv(self.sales_data_path)
        self.user_df=pd.read_json(self.user_details_api)[["id","name","username","email","address"]]
        weather_api_params=f'lat={self.lat}&lon={self.lon}&start_date={str(self.sales_df["order_date"].min())}&end_date={self.sales_df["order_date"].max()}'
        weather_result=requests.get(f'{self.weather_api}?{weather_api_params}&key={self.weather_api_key}')
        print(weather_result)
        self.weather_df=pd.json_normalize(json.loads(weather_result)["data"])[["pres","slp","wind_spd","temp","rh","clouds","precip","snow","datetime"]]
    def merge_to_single_df(self):
        sales_user_df=self.sales_df.merge(self.user_df,how='left',left_on=["customer_id"],right_on=["id"]).drop("id",axis="columns")
        return sales_user_df.merge(self.weather_df,how='left',left_on="order_date",right_on="datetime")
    def make_db_connection(self,host,dbname,user,password):
        self.conn=psycopg2.connect(host=host, dbname=dbname, user=user, password=password)
    def write_to_db(self,df,table_name):
        if type(df).__name__=='Series':
            df=df.reset_index()
        if 'index' in df.columns:
            df=df.drop('index',axis='columns')
        for i in df.columns:
            if df[i].count()>0 and type(df[i][0]).__name__=='dict':
                df[i]=df[i].apply(json.dumps)
        tuples = [tuple(x) for x in df.to_numpy()]
        cols = ','.join(list(df.columns))
        query = "INSERT INTO %s(%s) VALUES %%s" % (table_name, cols)
        cursor = self.conn.cursor()
        try:
            extras.execute_values(cursor, query, tuples)
            self.conn.commit()
        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            self.conn.rollback()
            cursor.close()
            return 1
        print(f"{table_name} {df.shape[0]} records were inserted")
        cursor.close()
class VisualizeData:
    def __init__(self,dataframe):
        self.df=dataframe
    def show_histogram(self,x,y):
        plt.hist(self.df[x], bins=20,bottom=5, weights=self.df[y], edgecolor='red')
        plt.xlabel(x)
        plt.ylabel(y)
        plt.title(f'Histogram for {x} and {y} Columns')
        plt.show()

In [83]:
if __name__ == "__main__":
    #Intiliaze DataPipeline
    pipe=DataPipeline("C:/Users/Mohamed/Downloads/sales_data.csv")
    #Read necessary data
    pipe.read_data()
    # merge all dataframes to single dataframe
    df=pipe.merge_to_single_df()
    
    #Determine the average order quantity per product 
    product_avg=df.groupby('product_id',as_index=False)['quantity'].mean()
    # Calculate total sales amount per customer
    sales_per_cust=df.groupby('customer_id',as_index=False)['price'].sum()
    sales_per_cust.rename(columns={'price': 'order_value'}, inplace=True)

    df1=df.copy()
    df1["order_date"]=pd.to_datetime(df1["order_date"])
    df1.set_index('order_date',inplace=True)
    
    # Analyze sales trends over time (e.g., monthly or quarterly sales)
    # Resample the data to get monthly or quarterly sales
    # 'M' represents monthly, 'Q' represents quarterly
    monthly_sales = df1['quantity'].resample('M').sum()
    quarterly_sales = df1['quantity'].resample('Q').sum()
    monthly_sales=monthly_sales.reset_index(name='monthly_sales_quantity')
    quarterly_sales=quarterly_sales.reset_index(name='quarterly_sales_quantity')

    temp_bins = 10
    wind_speed_bins = 10
    precip_bins = 10

    # Categorize weather data into bins
    # Make temperature and precip(rain) to ranges for analysis weather based sales
    df1['temp_bin'] = pd.cut(df1['temp'], bins=temp_bins).astype(str)
    df1['precip_bin'] = pd.cut(df1['precip'], bins=precip_bins).astype(str)

    # Calculate the average sales amount per rain condition
    average_sales_rain_based = df1.groupby(['precip_bin'])['quantity'].sum()
    # Calculate the average sales amount per temperature condition
    average_sales_temp_based = df1.groupby(['temp_bin'])['quantity'].sum()

    average_sales_rain_based=average_sales_rain_based.reset_index(name='sale_count')
    average_sales_temp_based=average_sales_temp_based.reset_index(name='sale_count')

    # Database connection details
    host = "db.lzzqdwfecduvpsotjsim.supabase.co"
    dbname = "postgres"
    user = "postgres"
    password = "#N!8_P?qyvsBg7."
    
    # call function to establish connection string
    pipe.make_db_connection(host,dbname,user,password)
    
    #write data to table
    pipe.write_to_db(df,'sales_data')
    pipe.write_to_db(product_avg,'product_sales_avg')
    pipe.write_to_db(sales_per_cust,'sales_per_cust')
    pipe.write_to_db(monthly_sales,'monthly_sales')
    pipe.write_to_db(quarterly_sales,'quarterly_sales')
    pipe.write_to_db(average_sales_rain_based,'average_sales_rain_based')
    pipe.write_to_db(average_sales_temp_based,'average_sales_temp_based')

sales_data 1000 records were inserted
product_sales_avg 50 records were inserted
sales_per_cust 10 records were inserted
monthly_sales 13 records were inserted
quarterly_sales 5 records were inserted
average_sales_rain_based 7 records were inserted
average_sales_temp_based 11 records were inserted
