<a href="https://colab.research.google.com/github/yagamiAbhi/Automobile-Manufacturing-Data-Analysis-pySpark/blob/main/Automobile_manufacturing_scenario.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Consider a leading two-wheeler manufacturing unit that produces two-wheeler components for the designs provided by various automobile companies

In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=871ae37edf0acbe3272143f418a420bc6e6b829448077d6d67c8fe2ff9ae6e54
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
from pyspark import SparkContext
sc = SparkContext(master='local',appName='AMS')
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession(sc)
sqlContext = SQLContext(sc)



### Create Namedtuple for the schema provided for both the datasets

In [24]:
from collections import namedtuple
order=namedtuple('order',['OrderID','CompanyID','partsRequired','OrderQuantity',
                          'CostOfPart','OrderDate','ExpectedDeliveryDate','DeliveryStatus'])
company=namedtuple('company',['CompanyID','CompanyName','CompanyLocation','CompanyContact','EstablishedYear'])

### Load both the csv files into RDDs

In [25]:
OrderBookRDD = sc.textFile('/content/OrderBook.csv')
CompanyDetailsRDD = sc.textFile('/content/CompanyDetails.csv')

In [26]:
OrderBookRDD = OrderBookRDD.map(lambda col:col.split(','))
CompanyDetailsRDD = CompanyDetailsRDD.map(lambda col:col.split(','))

In [27]:
display(OrderBookRDD.take(2))
CompanyDetailsRDD.take(2)

[['OD10001',
  'C1108',
  'Spring Forks',
  '21103',
  '9487867.48',
  '3-Nov-19',
  '24-Dec-19',
  'Pending'],
 ['OD10002',
  'C1035',
  'Fender',
  '26272',
  '4899369.32',
  '18-Dec-19',
  '14-Mar-20',
  'Pending']]

[['C1001', 'Steel Wheels', 'Pennsylvania', '7172809057', '2007'],
 ['C1002', 'MotoLab Bikes', 'Texas', '6829309477', '2011']]

### Convert both the datasets to DataFrames

In [28]:
order_df = OrderBookRDD.map(lambda c : order(c[0],c[1],c[2],int(c[3]),float(c[4]),c[5],c[6],c[7])).toDF()
order_df.printSchema()

root
 |-- OrderID: string (nullable = true)
 |-- CompanyID: string (nullable = true)
 |-- partsRequired: string (nullable = true)
 |-- OrderQuantity: long (nullable = true)
 |-- CostOfPart: double (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- ExpectedDeliveryDate: string (nullable = true)
 |-- DeliveryStatus: string (nullable = true)



In [29]:
company_df = spark.createDataFrame(CompanyDetailsRDD.map(lambda c : company(c[0],c[1],c[2],int(c[3]),int(c[4]))))
company_df.printSchema()

root
 |-- CompanyID: string (nullable = true)
 |-- CompanyName: string (nullable = true)
 |-- CompanyLocation: string (nullable = true)
 |-- CompanyContact: long (nullable = true)
 |-- EstablishedYear: long (nullable = true)



In [30]:
print('Total records in OrderBook DF : ', order_df.count())
order_df.show(3)

print('Total records in CompanyDetails DF : ', company_df.count())
company_df.show(3)

Total records in OrderBook DF :  1000
+-------+---------+-------------+-------------+----------+---------+--------------------+--------------+
|OrderID|CompanyID|partsRequired|OrderQuantity|CostOfPart|OrderDate|ExpectedDeliveryDate|DeliveryStatus|
+-------+---------+-------------+-------------+----------+---------+--------------------+--------------+
|OD10001|    C1108| Spring Forks|        21103|9487867.48| 3-Nov-19|           24-Dec-19|       Pending|
|OD10002|    C1035|       Fender|        26272|4899369.32|18-Dec-19|           14-Mar-20|       Pending|
|OD10003|    C1174|    Generator|        39789|4008848.49| 7-Dec-19|           31-Jan-20|     Delivered|
+-------+---------+-------------+-------------+----------+---------+--------------------+--------------+
only showing top 3 rows

Total records in CompanyDetails DF :  200
+---------+-------------+---------------+--------------+---------------+
|CompanyID|  CompanyName|CompanyLocation|CompanyContact|EstablishedYear|
+---------+---

### Fetch the details of all the companies that are at least 15 years old

In [31]:
comp_15yrs_old = company_df.filter((2023 - company_df.EstablishedYear) >= 15)
print('Total numbers of company that are atleast 15 yrs old: ', comp_15yrs_old.count())
comp_15yrs_old.show()

Total numbers of company that are atleast 15 yrs old:  150
+---------+----------------+---------------+--------------+---------------+
|CompanyID|     CompanyName|CompanyLocation|CompanyContact|EstablishedYear|
+---------+----------------+---------------+--------------+---------------+
|    C1001|    Steel Wheels|   Pennsylvania|    7172809057|           2007|
|    C1003|        Lee Auto|     California|    6193601290|           1994|
|    C1004|     Zoom Dynamo|          Texas|    8327309957|           1993|
|    C1005|Quakesons Motors|          Texas|    8175902345|           2002|
|    C1007|     Raido Bikes|         Hawaii|    8086946394|           2001|
|    C1009| Ilmio Jane Moto|   Pennsylvania|    6108768782|           2007|
|    C1011|    Steel Wheels|        Arizona|    6026446720|           1992|
|    C1012|Wingmo Standards|     Washington|    5091629062|           2006|
|    C1013|    Craiko Roads|           Iowa|    3191050692|           1990|
|    C1014|      Reo Bailio| 

### Fetch the name of the company who have ordered “Brakes”

In [33]:
join_df = order_df.join(other=company_df, on='CompanyID', how='inner')
join_df.filter("partsRequired=='Brakes'").select(['CompanyName','partsRequired']).show()

+------------------+-------------+
|       CompanyName|partsRequired|
+------------------+-------------+
|       Zoom Dynamo|       Brakes|
|      Craiko Roads|       Brakes|
|        Kioto Moto|       Brakes|
|       Raido Bikes|       Brakes|
|       Raido Bikes|       Brakes|
|      Laser Wheels|       Brakes|
|  Quakesons Motors|       Brakes|
|      Steel Wheels|       Brakes|
|        Reo Bailio|       Brakes|
|        Kioto Moto|       Brakes|
|      Damson Bikes|       Brakes|
|          Lee Auto|       Brakes|
|      Damson Bikes|       Brakes|
|        Kioto Moto|       Brakes|
|Maino World Motors|       Brakes|
|           Neopins|       Brakes|
|      Steel Wheels|       Brakes|
|          Lee Auto|       Brakes|
|  Quakesons Motors|       Brakes|
|           Neopins|       Brakes|
+------------------+-------------+
only showing top 20 rows



In [34]:
print('Total numbers of company who ordered "Brakes" : ',join_df.filter("partsRequired=='Brakes'").count())

Total numbers of company who ordered "Brakes" :  42


### Find the total number of “Exhaust Pipes” that have been ordered from “New York” location

In [35]:
temp = join_df.filter( (join_df.partsRequired=='Exhaust Pipes') & (join_df.CompanyLocation=='New York') )
print('Total number of “Exhaust Pipes” that have been ordered from “New York” location: ', temp.count())

Total number of “Exhaust Pipes” that have been ordered from “New York” location:  2


### Find the total count of all the orders that has been “Delivered”

In [36]:
temp = order_df.filter("DeliveryStatus=='Delivered'")
print('Total count of all the orders that has been Delivered: ', temp.count())

Total count of all the orders that has been Delivered:  255


### Find the total count of all the orders that has been “Pending”

In [37]:
temp = order_df.filter("DeliveryStatus=='Pending'")
print('Total count of all the orders that has been Pending: ', temp.count())

Total count of all the orders that has been Pending:  745


### Find the total quantity and cost of all the parts ordered from “North Carolina” location

In [38]:
temp = join_df.filter(join_df.CompanyLocation=='North Carolina')

In [39]:
temp.agg({"OrderQuantity": "sum", "CostOfPart": "sum"}).show()

+--------------------+------------------+
|     sum(CostOfPart)|sum(OrderQuantity)|
+--------------------+------------------+
|1.4936626448999998E8|            974157|
+--------------------+------------------+



### Display the start date, end date and company name who have ordered for “Exhaust Pipes”

In [40]:
join_df.filter(join_df.partsRequired=='Exhaust Pipes').select(['OrderDate', 'ExpectedDeliveryDate', 'CompanyName']).show()

+---------+--------------------+------------------+
|OrderDate|ExpectedDeliveryDate|       CompanyName|
+---------+--------------------+------------------+
|12-Nov-19|            8-Jan-20|Maino World Motors|
|30-Nov-19|           25-Jan-20|Maino World Motors|
|27-Oct-19|           28-Jan-20|   Ilmio Jane Moto|
|30-Oct-19|            7-Feb-20|       Raido Bikes|
|18-Dec-19|           12-Feb-20|Maino World Motors|
|19-Oct-19|           23-Mar-20|      Laser Wheels|
|14-Nov-19|            9-Feb-20|       Zoom Dynamo|
|24-Oct-19|           22-Mar-20|        Reo Bailio|
| 6-Dec-19|           14-Jan-20|        Reo Bailio|
|20-Oct-19|           26-Feb-20|       Zoom Dynamo|
|27-Nov-19|           27-Feb-20|   Ilmio Jane Moto|
|15-Dec-19|           25-Jan-20|  Wingmo Standards|
|10-Nov-19|           20-Mar-20|   Ilmio Jane Moto|
|18-Oct-19|           13-Mar-20|Maino World Motors|
|25-Oct-19|           28-Feb-20|Maino World Motors|
|12-Oct-19|            2-Mar-20|Maino World Motors|
|30-Oct-19| 