## Analisando os dados do arquivo "Production.Product.csv"

In [1]:
#importando as bibliotecas necessarias

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
#Enviar os dados para o HDFS(executado no terminal do Linux)
!hdfs dfs -mkdir /user/ronnan/data/data_project_rox
!hdfs dfs -put input/data_project_rox/*.csv /user/ronnan/data/data_project_rox

mkdir: `/user/ronnan/data/data_project_rox': File exists
put: `input/data_project_rox/*.csv': No such file or directory


In [3]:
#Verificando se os dados já estão no HDFS
!hdfs dfs -ls /user/ronnan/data/data_project_rox

Found 6 items
-rw-r--r--   3 root supergroup   13646947 2022-06-19 22:17 /user/ronnan/data/data_project_rox/Person.Person.csv
-rw-r--r--   3 root supergroup     104823 2022-06-19 22:17 /user/ronnan/data/data_project_rox/Production.Product.csv
-rw-r--r--   3 root supergroup    1813963 2022-06-19 22:17 /user/ronnan/data/data_project_rox/Sales.Customer.csv
-rw-r--r--   3 root supergroup   13801182 2022-06-19 22:17 /user/ronnan/data/data_project_rox/Sales.SalesOrderDetail.csv
-rw-r--r--   3 root supergroup    8267704 2022-06-19 22:17 /user/ronnan/data/data_project_rox/Sales.SalesOrderHeader.csv
-rw-r--r--   3 root supergroup      36680 2022-06-19 22:17 /user/ronnan/data/data_project_rox/Sales.SpecialOfferProduct.csv


In [4]:
#criando data_frame para tratamento dos dados, realizando leitura em csv

data_product = spark.read.csv("/user/ronnan/data/data_project_rox/Production.Product.csv"
                            ,sep = ";",inferSchema=True, header=True,ignoreLeadingWhiteSpace=True)

In [5]:
# verificando se o esquema foi atribuido
print(data_product.printSchema())

root
 |-- ProductID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- ProductNumber: string (nullable = true)
 |-- MakeFlag: integer (nullable = true)
 |-- FinishedGoodsFlag: integer (nullable = true)
 |-- Color: string (nullable = true)
 |-- SafetyStockLevel: integer (nullable = true)
 |-- ReorderPoint: integer (nullable = true)
 |-- StandardCost: string (nullable = true)
 |-- ListPrice: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- SizeUnitMeasureCode: string (nullable = true)
 |-- WeightUnitMeasureCode: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- DaysToManufacture: integer (nullable = true)
 |-- ProductLine: string (nullable = true)
 |-- Class: string (nullable = true)
 |-- Style: string (nullable = true)
 |-- ProductSubcategoryID: string (nullable = true)
 |-- ProductModelID: string (nullable = true)
 |-- SellStartDate: timestamp (nullable = true)
 |-- SellEndDate: string (nullable = true)
 |-- DiscontinuedDate: string (

In [6]:
#verificando os dados da coluna ProductID
data_product.select('ProductID').describe().show()

+-------+------------------+
|summary|         ProductID|
+-------+------------------+
|  count|               504|
|   mean| 673.0396825396825|
| stddev|229.37314180957824|
|    min|                 1|
|    max|               999|
+-------+------------------+



In [7]:
#verificando os dados da coluna Name
data_product.select('Name').describe().show()

+-------+-----------------+
|summary|             Name|
+-------+-----------------+
|  count|              504|
|   mean|             null|
| stddev|             null|
|    min|     AWC Logo Cap|
|    max|Women's Tights, S|
+-------+-----------------+



In [8]:
#explorando os dados das colunas 'ProductID','Name','MakeFlag'
data_product.select('ProductID','Name','MakeFlag').show()

+---------+--------------------+--------+
|ProductID|                Name|MakeFlag|
+---------+--------------------+--------+
|        1|     Adjustable Race|       0|
|        2|        Bearing Ball|       0|
|        3|     BB Ball Bearing|       1|
|        4|Headset Ball Bear...|       0|
|      316|               Blade|       1|
|      317|         LL Crankarm|       0|
|      318|         ML Crankarm|       0|
|      319|         HL Crankarm|       0|
|      320|     Chainring Bolts|       0|
|      321|       Chainring Nut|       0|
|      322|           Chainring|       0|
|      323|          Crown Race|       0|
|      324|         Chain Stays|       1|
|      325|             Decal 1|       0|
|      326|             Decal 2|       0|
|      327|           Down Tube|       1|
|      328|   Mountain End Caps|       1|
|      329|       Road End Caps|       1|
|      330|    Touring End Caps|       1|
|      331|            Fork End|       1|
+---------+--------------------+--

In [9]:
# explorando os dados de cada coluna para verificar se a necessidade de alguma normalização

data_product_ProductID = data_product.groupBy("ProductID").count()
data_product_ProductID.show()

data_product_Name = data_product.groupBy("Name").count()
data_product_Name.show()

data_product_MakeFlag = data_product.groupBy("MakeFlag").count()
data_product_MakeFlag.show()

data_product_FinishedGoodsFlag = data_product.groupBy("FinishedGoodsFlag").count()
data_product_FinishedGoodsFlag.show()

data_product_Color = data_product.groupBy("Color").count()
data_product_Color.show()

data_product_SafetyStockLevel = data_product.groupBy("SafetyStockLevel").count()
data_product_SafetyStockLevel.show()

data_product_ReorderPoint = data_product.groupBy("ReorderPoint").count()
data_product_ReorderPoint.show()

data_product_StandardCost = data_product.groupBy("StandardCost").count()
data_product_StandardCost.show()

data_product_ListPrice= data_product.groupBy("ListPrice").count()
data_product_ListPrice.show()

data_product_Size= data_product.groupBy("Size").count()
data_product_Size.show()

data_product_SizeUnitMeasureCode= data_product.groupBy("SizeUnitMeasureCode").count()
data_product_SizeUnitMeasureCode.show()

data_product_WeightUnitMeasureCode= data_product.groupBy("WeightUnitMeasureCode").count()
data_product_WeightUnitMeasureCode.show()

data_product_Weight= data_product.groupBy("Weight").count()
data_product_Weight.show()

data_product_DaysToManufacture= data_product.groupBy("DaysToManufacture").count()
data_product_DaysToManufacture.show()

data_product_ProductLine = data_product.groupBy("ProductLine").count()
data_product_ProductLine.show()

data_product_Class = data_product.groupBy("Class").count()
data_product_Class.show()

data_product_Style= data_product.groupBy("Style").count()
data_product_Style.show()

data_product_ProductSubcategoryID= data_product.groupBy("ProductSubcategoryID").count()
data_product_ProductSubcategoryID.show()

data_product_ProductModelID= data_product.groupBy("ProductModelID").count()
data_product_ProductModelID.show()

data_product_SellStartDate= data_product.groupBy("SellStartDate").count()
data_product_SellStartDate.show()

data_product_SellEndDate= data_product.groupBy("SellEndDate").count()
data_product_SellEndDate.show()

data_product_DiscontinuedDate= data_product.groupBy("DiscontinuedDate").count()
data_product_DiscontinuedDate.show()

data_product_rowguid= data_product.groupBy("rowguid").count()
data_product_rowguid.show()

data_product_ModifiedDate= data_product.groupBy("ModifiedDate").count()
data_product_ModifiedDate.show()


+---------+-----+
|ProductID|count|
+---------+-----+
|      463|    1|
|      471|    1|
|      496|    1|
|      833|    1|
|      392|    1|
|      737|    1|
|      858|    1|
|      897|    1|
|      516|    1|
|      451|    1|
|      808|    1|
|      458|    1|
|      879|    1|
|      883|    1|
|      481|    1|
|      799|    1|
|      804|    1|
|      898|    1|
|      970|    1|
|      472|    1|
+---------+-----+
only showing top 20 rows

+--------------------+-----+
|                Name|count|
+--------------------+-----+
|HL Road Frame - R...|    1|
|             Decal 1|    1|
|ML Road Frame - R...|    1|
|Mountain-200 Blac...|    1|
|Headlights - Weat...|    1|
|      Lock Washer 10|    1|
|           Down Tube|    1|
|             ML Fork|    1|
|   Women's Tights, S|    1|
|Touring-1000 Yell...|    1|
|Touring-1000 Blue...|    1|
|ML Touring Seat A...|    1|
|  ML Road Rear Wheel|    1|
|HL Road Frame - B...|    1|
|Short-Sleeve Clas...|    1|
|ML Mountain Frame..

In [10]:
# criando variavel para ajustar o campo para novo formato de data para date
data_product_data_formatada_start = data_product.withColumn("SellStartDate",to_timestamp(col("SellStartDate")))\
                                        .withColumn("SellStartDate",to_date(col("SellStartDate")))

In [11]:
# criando variavel para ajustar o campo para novo formato de data para date

data_product_data_formatada_end = data_product_data_formatada_start.withColumn("SellEndDate",to_timestamp(col("SellEndDate")))\
                                        .withColumn("SellEndDate",to_date(col("SellEndDate")))

In [12]:
data_product_data_formatada_modified = data_product_data_formatada_end.withColumn("ModifiedDate",to_timestamp(col("ModifiedDate")))\
                                        .withColumn("ModifiedDate",to_date(col("ModifiedDate")))

In [13]:
data_product_data_formatada_modified.select('SellStartDate','SellEndDate','ModifiedDate').show(200)

+-------------+-----------+------------+
|SellStartDate|SellEndDate|ModifiedDate|
+-------------+-----------+------------+
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30|       null|  2014-02-08|
|   2008-04-30| 

In [14]:
# salvando os dados tratados em um unico em formato .parquet e compressão snappy
data_product_data_formatada_modified.coalesce(1).write.saveAsTable("data_product_parquet_save", format="parquet", compression="snappy")

In [15]:
# verificando o arquivo salvo em .parquet
!hdfs dfs -ls /user/hive/warehouse/data_product_parquet_save

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-06-20 20:40 /user/hive/warehouse/data_product_parquet_save/_SUCCESS
-rw-r--r--   2 root supergroup      39110 2022-06-20 20:40 /user/hive/warehouse/data_product_parquet_save/part-00000-02ba7b27-70f2-4268-ba5b-044cb00ceeef-c000.snappy.parquet


In [16]:
# salvando em um unico arquivo uma versão em .csv já tratada --> 
data_product_data_formatada_modified.coalesce(1).write.csv("data_product_final_csv", header=True)

In [17]:
# verificando o caminho que o arquivo foi salvo
!hdfs dfs -ls /user/root/data_product_final_csv

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-06-20 20:41 /user/root/data_product_final_csv/_SUCCESS
-rw-r--r--   2 root supergroup      88189 2022-06-20 20:41 /user/root/data_product_final_csv/part-00000-7a62cd9e-ac2b-44bc-a706-50cf0c3616a5-c000.csv


In [18]:
#exportar o arquivo snappy para fora do hdfs(Executar no terminal hdfs dfs)

hdfs dfs -get /user/hive/warehouse/data_product_parquet_save/part-00000-02ba7b27-70f2-4268-ba5b-044cb00ceeef-c000.snappy.parquet input/data_project_rox/

sudo mv part-00000-02ba7b27-70f2-4268-ba5b-044cb00ceeef-c000.snappy.parquet production.product.parquet

In [19]:
#exportar o arquivo .csv para fora do hdfs(Executar no terminal hdfs dfs)

hdfs dfs -get /user/root/data_product_final_csv/part-00000-7a62cd9e-ac2b-44bc-a706-50cf0c3616a5-c000.csv  input/data_project_rox/

sudo mv part-00000-7a62cd9e-ac2b-44bc-a706-50cf0c3616a5-c000.csv production.product_tratado.csv

In [20]:
#!hdfs dfs -rm -R /user/root/data_product_final_csv

#!hdfs dfs -rm -R /user/hive/warehouse/data_product_parquet
