In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import month
from pyspark.sql.functions import col, round

In [3]:
spark = SparkSession.builder.appName('Coffee Sales').getOrCreate()

In [4]:
data = spark.read.csv('data.csv',inferSchema=True,header=True,encoding='utf8')

In [5]:
data.show()

+---------+----+--------------------------------------+----------+----------------+------+------------+-------+---------+------------+------------+-----------------+------+-----+-------------+-----------+-------------+-------------+-------------+--------------+-------+
|Area Code|Cogs|DifferenceBetweenActualandTargetProfit|      Date|Inventory Margin|Margin| Market_size| Market|Marketing|Product_line|Product_type|          Product|Profit|Sales|        State|Target_cogs|Target_margin|Target_profit|Target_sales |Total_expenses|   Type|
+---------+----+--------------------------------------+----------+----------------+------+------------+-------+---------+------------+------------+-----------------+------+-----+-------------+-----------+-------------+-------------+-------------+--------------+-------+
|      303|  51|                                   -35| 10/1/2012|             503|    71|Major Market|Central|       46|      Leaves|  Herbal Tea|            Lemon|    -5|  122|     Colorad

In [6]:
data.printSchema()

root
 |-- Area Code: integer (nullable = true)
 |-- Cogs: integer (nullable = true)
 |-- DifferenceBetweenActualandTargetProfit: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Inventory Margin: integer (nullable = true)
 |-- Margin: integer (nullable = true)
 |-- Market_size: string (nullable = true)
 |-- Market: string (nullable = true)
 |-- Marketing: integer (nullable = true)
 |-- Product_line: string (nullable = true)
 |-- Product_type: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Profit: integer (nullable = true)
 |-- Sales: integer (nullable = true)
 |-- State: string (nullable = true)
 |-- Target_cogs: integer (nullable = true)
 |-- Target_margin: integer (nullable = true)
 |-- Target_profit: integer (nullable = true)
 |-- Target_sales : integer (nullable = true)
 |-- Total_expenses: integer (nullable = true)
 |-- Type: string (nullable = true)



In [11]:
market = data[data['Area Code'],data['Market'],data['Market_Size'],data['State']]

In [12]:
market.show()

+---------+-------+------------+-------------+
|Area Code| Market| Market_Size|        State|
+---------+-------+------------+-------------+
|      303|Central|Major Market|     Colorado|
|      970|Central|Major Market|     Colorado|
|      409|  South|Major Market|        Texas|
|      850|   East|Major Market|      Florida|
|      562|   West|Major Market|   California|
|      712|Central|Small Market|         Iowa|
|      860|   East|Small Market|  Connecticut|
|      918|  South|Small Market|     Oklahoma|
|      775|   West|Small Market|       Nevada|
|      435|   West|Small Market|         Utah|
|      603|   East|Small Market|New Hampshire|
|      603|   East|Small Market|New Hampshire|
|      603|   East|Small Market|New Hampshire|
|      318|  South|Small Market|    Louisiana|
|      775|   West|Small Market|       Nevada|
|      503|   West|Small Market|       Oregon|
|      573|Central|Small Market|     Missouri|
|      262|Central|Small Market|    Wisconsin|
|      801|  

In [13]:
product = data['Area Code','Product_type','Product','Type']

In [14]:
product.show(100)

+---------+------------+-----------------+-------+
|Area Code|Product_type|          Product|   Type|
+---------+------------+-----------------+-------+
|      303|  Herbal Tea|            Lemon|  Decaf|
|      970|  Herbal Tea|             Mint|  Decaf|
|      409|  Herbal Tea|            Lemon|  Decaf|
|      850|         Tea|       Darjeeling|Regular|
|      562|         Tea|        Green Tea|Regular|
|      712|    Espresso|   Decaf Espresso|  Decaf|
|      860|    Espresso|   Decaf Espresso|  Decaf|
|      918|      Coffee|Decaf Irish Cream|  Decaf|
|      775|      Coffee|Decaf Irish Cream|  Decaf|
|      435|    Espresso|   Decaf Espresso|  Decaf|
|      603|      Coffee|         Amaretto|Regular|
|      603|      Coffee|        Colombian|Regular|
|      603|    Espresso|      Caffe Mocha|Regular|
|      318|    Espresso|      Caffe Latte|Regular|
|      775|      Coffee|        Colombian|Regular|
|      503|    Espresso|      Caffe Latte|Regular|
|      573|  Herbal Tea|       

In [15]:
sales_data = data['Date','Area Code','Profit','Margin','Sales','Cogs','Total_expenses','Marketing','Inventory Margin']

In [16]:
sales_data.show()

+----------+---------+------+------+-----+----+--------------+---------+----------------+
|      Date|Area Code|Profit|Margin|Sales|Cogs|Total_expenses|Marketing|Inventory Margin|
+----------+---------+------+------+-----+----+--------------+---------+----------------+
| 10/1/2012|      303|    -5|    71|  122|  51|            76|       46|             503|
| 10/1/2012|      970|    26|    71|  123|  52|            45|       17|             405|
| 10/2/2012|      409|    28|    64|  107|  43|            36|       13|             419|
| 10/3/2012|      850|    35|    56|   94|  38|            21|       10|             871|
| 10/4/2012|      562|    56|   110|  182|  72|            54|       23|             650|
| 10/5/2012|      712|    31|    43|   43|   0|            12|        0|             430|
| 10/6/2012|      860|    21|    64|  111|  47|            43|       15|             375|
| 10/7/2012|      918|    21|    39|   66|  27|            18|        7|             859|
| 10/8/201

In [19]:
sales_data.describe()

DataFrame[summary: string, Date: string, Area Code: string, Profit: string, Margin: string, Sales: string, Cogs: string, Total_expenses: string, Marketing: string, Inventory Margin: string]

In [21]:
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import date_format
from pyspark.sql.functions import year

In [22]:
calender = data.select('Date')

In [23]:
cal = calender.withColumn('Date', to_timestamp(calender['Date'],'dd/MM/yy HH:mm:ss'))

In [24]:
calender = cal

In [26]:
calender = calender.withColumn('Day',date_format(calender['Date'],'E')).withColumn('Month', date_format(calender['Date'], 'MMMM'))

In [27]:
calender = calender.withColumn('Year',year(cal['Date']))

In [28]:
calender.show()

+----+----+-----+----+
|Date| Day|Month|Year|
+----+----+-----+----+
|null|null| null|null|
|null|null| null|null|
|null|null| null|null|
|null|null| null|null|
|null|null| null|null|
|null|null| null|null|
|null|null| null|null|
|null|null| null|null|
|null|null| null|null|
|null|null| null|null|
|null|null| null|null|
|null|null| null|null|
|null|null| null|null|
|null|null| null|null|
|null|null| null|null|
|null|null| null|null|
|null|null| null|null|
|null|null| null|null|
|null|null| null|null|
|null|null| null|null|
+----+----+-----+----+
only showing top 20 rows



In [30]:
product.orderBy("Product").show(200)

+---------+------------+-----------+-------+
|Area Code|Product_type|    Product|   Type|
+---------+------------+-----------+-------+
|      603|      Coffee|   Amaretto|Regular|
|      262|      Coffee|   Amaretto|Regular|
|      603|      Coffee|   Amaretto|Regular|
|      603|      Coffee|   Amaretto|Regular|
|      262|      Coffee|   Amaretto|Regular|
|      603|      Coffee|   Amaretto|Regular|
|      440|      Coffee|   Amaretto|Regular|
|      513|      Coffee|   Amaretto|Regular|
|      603|      Coffee|   Amaretto|Regular|
|      614|      Coffee|   Amaretto|Regular|
|      234|      Coffee|   Amaretto|Regular|
|      603|      Coffee|   Amaretto|Regular|
|      563|      Coffee|   Amaretto|Regular|
|      641|      Coffee|   Amaretto|Regular|
|      515|      Coffee|   Amaretto|Regular|
|      563|      Coffee|   Amaretto|Regular|
|      715|      Coffee|   Amaretto|Regular|
|      330|      Coffee|   Amaretto|Regular|
|      541|      Coffee|   Amaretto|Regular|
|      715

In [33]:
exp = data.agg({'Total_expenses': 'sum'}).withColumnRenamed('sum(Total Expenses)', 'Total_Expenses')

In [34]:
exp.show()

+-------------------+
|sum(Total_expenses)|
+-------------------+
|              57174|
+-------------------+



In [35]:
profit = data.agg({'Profit': 'sum'}).withColumnRenamed('sum(Profit)', 'Total_Profit')

In [36]:
profit.show()

+------------+
|Total_Profit|
+------------+
|       64311|
+------------+



In [37]:
avg = data.agg({'Sales':'average'}).withColumnRenamed('avg(Sales)', 'Avg_sales')

In [38]:
avg.show()

+------------------+
|         Avg_sales|
+------------------+
|191.04990583804144|
+------------------+



In [45]:
avg_product_sales = data.join(product, on='Product',how="inner").select('Product','Sales','Profit','Total_expenses').groupBy('Product').avg().show()

+-----------------+------------------+------------------+-------------------+
|          Product|        avg(Sales)|       avg(Profit)|avg(Total_expenses)|
+-----------------+------------------+------------------+-------------------+
|   Decaf Espresso| 185.1764705882353| 69.00980392156863|  45.68627450980392|
|        Earl Grey|229.80555555555554| 82.98611111111111| 58.611111111111114|
|        Green Tea|118.33333333333333|1.2361111111111112|  46.30555555555556|
|         Amaretto|141.27083333333334|28.166666666666668| 46.958333333333336|
|            Lemon|             200.4|             63.45| 57.916666666666664|
|      Caffe Latte|160.46296296296296|  50.2962962962963| 47.666666666666664|
|Decaf Irish Cream|154.48958333333334|          33.34375|             50.375|
|      Caffe Mocha|180.96666666666667| 39.05833333333333|               63.7|
| Regular Espresso| 374.6666666666667|             191.0|  67.44444444444444|
|        Colombian|256.34166666666664|107.76666666666667|  64.48

In [44]:
data.printSchema()

root
 |-- Area Code: integer (nullable = true)
 |-- Cogs: integer (nullable = true)
 |-- DifferenceBetweenActualandTargetProfit: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Inventory Margin: integer (nullable = true)
 |-- Margin: integer (nullable = true)
 |-- Market_size: string (nullable = true)
 |-- Market: string (nullable = true)
 |-- Marketing: integer (nullable = true)
 |-- Product_line: string (nullable = true)
 |-- Product_type: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Profit: integer (nullable = true)
 |-- Sales: integer (nullable = true)
 |-- State: string (nullable = true)
 |-- Target_cogs: integer (nullable = true)
 |-- Target_margin: integer (nullable = true)
 |-- Target_profit: integer (nullable = true)
 |-- Target_sales : integer (nullable = true)
 |-- Total_expenses: integer (nullable = true)
 |-- Type: string (nullable = true)



In [53]:
product_sales = data.join(product, on='Product',how="inner").select('Profit').groupBy('Product_type').sum().show()

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `Product_type` cannot be resolved. Did you mean one of the following? [`Profit`].;
'Aggregate ['Product_type], ['Product_type, sum(Profit#29) AS sum(Profit)#1754L]
+- Project [Profit#29]
   +- Project [Product#28, Area Code#17, Cogs#18, DifferenceBetweenActualandTargetProfit#19, Date#20, Inventory Margin#21, Margin#22, Market_size#23, Market#24, Marketing#25, Product_line#26, Product_type#27, Profit#29, Sales#30, State#31, Target_cogs#32, Target_margin#33, Target_profit#34, Target_sales #35, Total_expenses#36, Type#37, Area Code#1705, Product_type#1715, Type#1725]
      +- Join Inner, (Product#28 = Product#1716)
         :- Relation [Area Code#17,Cogs#18,DifferenceBetweenActualandTargetProfit#19,Date#20,Inventory Margin#21,Margin#22,Market_size#23,Market#24,Marketing#25,Product_line#26,Product_type#27,Product#28,Profit#29,Sales#30,State#31,Target_cogs#32,Target_margin#33,Target_profit#34,Target_sales #35,Total_expenses#36,Type#37] csv
         +- Project [Area Code#1705, Product_type#1715, Product#1716, Type#1725]
            +- Relation [Area Code#1705,Cogs#1706,DifferenceBetweenActualandTargetProfit#1707,Date#1708,Inventory Margin#1709,Margin#1710,Market_size#1711,Market#1712,Marketing#1713,Product_line#1714,Product_type#1715,Product#1716,Profit#1717,Sales#1718,State#1719,Target_cogs#1720,Target_margin#1721,Target_profit#1722,Target_sales #1723,Total_expenses#1724,Type#1725] csv


In [54]:
import pymysql

In [55]:
from sqlalchemy import create_engine

In [None]:
connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='root',
    password='Lokesh@112',
    database='sales'
)

In [None]:
cursor = connection.cursor()

In [None]:
market_df = market.toPandas()

In [None]:
market_table_query = """
    CREATE TABLE Market (
        Area_Code INT,
        Market VARCHAR(30),
        Market_Size VARCHAR(30),
        State VARCHAR(30)
    )
"""

In [None]:
cursor.execute(product_table_query)

In [None]:
insert_query = "INSERT INTO Product (ProductID, Area_Code, Product_Type, Product, Type) VALUES (%s, %s, %s, %s, %s)"

In [None]:
data = df.values.tolist()

In [None]:
cursor.executemany(insert_query, data)

In [None]:
connection.commit()

In [None]:
df = data.toPandas()

In [None]:
sales_table_query = """
    CREATE TABLE Sales (
      ProductID INT,
      `Date` DATE,
      Area_Code INT,
      Profit DOUBLE,
      Margin DOUBLE,
      Sales DOUBLE,
      COGS DOUBLE,
      TotalExpenses DOUBLE,
      Marketing DOUBLE,
      Inventory DOUBLE,
      BudgetProfit DOUBLE,
      BudgetCOGS DOUBLE,
      BudgetMargin DOUBLE,
      BudgetSales DOUBLE
)
"""

In [None]:
cursor.execute(sales_table_query)

In [None]:
insert_query = "INSERT INTO Sales (ProductID, Date, Area_Code, Profit, Margin, Sales, COGS, TotalExpenses, Marketing, Inventory, BudgetProfit, BudgetCOGS, BudgetMargin, BudgetSales) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"

In [None]:
data = df.values.tolist()

In [None]:
sales_table_query = """
    CREATE TABLE Sales (
      ProductID INT,
      `Date` DATE,
      Area_Code INT,
      Profit DOUBLE,
      Margin DOUBLE,
      Sales DOUBLE,
      COGS DOUBLE,
      TotalExpenses DOUBLE,
      Marketing DOUBLE,
      Inventory DOUBLE,
      BudgetProfit DOUBLE,
      BudgetCOGS DOUBLE,
      BudgetMargin DOUBLE,
      BudgetSales DOUBLE
)
"""

In [None]:
cursor.execute(sales_table_query)

In [None]:
insert_query = "INSERT INTO Sales (ProductID, Date, Area_Code, Profit, Margin, Sales, COGS, TotalExpenses, Marketing, Inventory, BudgetProfit, BudgetCOGS, BudgetMargin, BudgetSales) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"

In [None]:
data = df.values.tolist()

In [None]:
cursor.executemany(insert_query, data)

In [None]:
connection.commit()

In [None]:
df = calender.toPandas()

In [None]:
Calender_table_query = """
    CREATE TABLE Calender (
      Date TIMESTAMP,
      Day VARCHAR(255),
      Month VARCHAR(255),
      Year INT
)
"""

In [None]:
cursor.execute(Calender_table_query)

In [None]:
insert_query = "INSERT INTO Calender (Date, Day, Month, Year) VALUES (%s, %s, %s, %s)"

In [None]:
data = df.values.tolist()

In [None]:
cursor.executemany(insert_query, data)

In [None]:
connection.commit()

In [None]:
cursor.close()

In [None]:
connection.close()

In [None]:
data.show()