In [52]:
import findspark
findspark.init()
import pyspark
findspark.find()
from pyspark.sql.types import MapType,StringType,IntegerType,DateType
from pyspark.sql.functions import col, to_timestamp

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('unir').setMaster('local')
sc = pyspark.SparkContext(conf=conf)

In [3]:
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.sql("use default")
sqlContext.sql("show tables").show()



+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|    sales|      false|
+---------+---------+-----------+



In [68]:
sqlContext.sql("CREATE SCHEMA IF NOT EXISTS Stage;")
sqlContext.sql("DROP TABLE IF EXISTS Stage.Sales")
sqlSales = """
CREATE TABLE IF NOT EXISTS Stage.Sales
(Store INT,Dept INT,Date String,Weekly_Sales DECIMAL(10,2),IsHoliday BOOLEAN)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
TBLPROPERTIES ('skip.header.line.count'='1')
"""
sqlContext.sql(sqlSales)
sqlContext.sql("DROP TABLE IF EXISTS Sales")
sqlSales = """
CREATE TABLE IF NOT EXISTS Sales
(Store INT,Dept INT,Date TIMESTAMP,Weekly_Sales DECIMAL(10,2),IsHoliday BOOLEAN)
STORED AS ORC
"""
sqlContext.sql(sqlSales)
sqlContext.sql("use stage")
sqlContext.sql("show tables").show()
sqlContext.sql("use default")
sqlContext.sql("show tables").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|    stage|    sales|      false|
|    stage|    store|      false|
+---------+---------+-----------+

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|    sales|      false|
|  default|    store|      false|
+---------+---------+-----------+



In [76]:
sqlContext.sql("DROP TABLE IF EXISTS Stage.Store")
sqlSales = """
CREATE TABLE IF NOT EXISTS Stage.Store
(Store Int,Type String, Size Int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
TBLPROPERTIES ('skip.header.line.count'='1')
"""
sqlContext.sql(sqlSales)
sqlContext.sql("DROP TABLE IF EXISTS Store")
sqlSales = """
CREATE EXTERNAL TABLE IF NOT EXISTS Store
(Store Int,Type String, Size Int)
STORED AS ORC
location '/analytics_equipo_24/external_store_data'
"""
sqlContext.sql(sqlSales)
sqlContext.sql("use stage")
sqlContext.sql("show tables").show()
sqlContext.sql("use default")
sqlContext.sql("show tables").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|    stage|    sales|      false|
|    stage|    store|      false|
+---------+---------+-----------+

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|    sales|      false|
|  default|    store|      false|
+---------+---------+-----------+



In [78]:
sqlContext.sql("DROP TABLE IF EXISTS Stage.Features")
sqlSales = """
CREATE TABLE IF NOT EXISTS Stage.Features
(Store int, Date string, Temperature decimal(10,2), Fuel_Price decimal(10,2),
MarkDown1 decimal(10,2), MarkDown2 decimal(10,2), MarkDown3 decimal(10,2), MarkDown4 decimal(10,2), MarkDown5 decimal(10,2),
CPI decimal(10,2), Unemployment decimal(10,2), IsHoliday boolean)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
TBLPROPERTIES ('skip.header.line.count'='1')
"""
sqlContext.sql(sqlSales)
sqlContext.sql("DROP TABLE IF EXISTS Features")
sqlSales = """
CREATE TABLE IF NOT EXISTS Features
(Store int, Date string, Temperature decimal(10,2), Fuel_Price decimal(10,2),
MarkDown1 decimal(10,2), MarkDown2 decimal(10,2), MarkDown3 decimal(10,2), MarkDown4 decimal(10,2), MarkDown5 decimal(10,2),
CPI decimal(10,2), Unemployment decimal(10,2), IsHoliday boolean)
STORED AS ORC
"""
sqlContext.sql(sqlSales)
sqlContext.sql("use stage")
sqlContext.sql("show tables").show()
sqlContext.sql("use default")
sqlContext.sql("show tables").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|    stage| features|      false|
|    stage|    sales|      false|
|    stage|    store|      false|
+---------+---------+-----------+

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default| features|      false|
|  default|    sales|      false|
|  default|    store|      false|
+---------+---------+-----------+



In [8]:
import subprocess

path = "/analytics_equipo_24"
hdfs_path = "c:\\winutils\\bin\\"
result = subprocess.run(f"{hdfs_path}hdfs dfs -mkdir {path}", shell=True, capture_output=True)
result.stdout

b''

In [70]:
result = subprocess.run(f"{hdfs_path}hdfs dfs -copyFromLocal c:\\repo\\unir\\*.csv {path}", shell=True, capture_output=True)
result = subprocess.run(f"{hdfs_path}hdfs dfs -ls {path}", shell=True, capture_output=True)
result.stdout.splitlines()

[b'Found 3 items',
 b'-rw-r--r--   1 mcast supergroup     600478 2022-02-19 19:26 /analytics_equipo_24/features.csv',
 b'-rw-r--r--   1 mcast supergroup   13264115 2022-02-19 20:33 /analytics_equipo_24/sales.csv',
 b'-rw-r--r--   1 mcast supergroup        577 2022-02-19 19:26 /analytics_equipo_24/stores.csv']

In [71]:
#Insertar data de sales.csv a tabla de staging
sqlContext.sql(f"LOAD DATA INPATH 'hdfs://localhost:9000/{path}/sales.csv' INTO TABLE stage.sales;")
stage_sales = sqlContext.table("stage.sales")
stage_sales.show()

+-----+----+----------+------------+---------+
|Store|Dept|      Date|Weekly_Sales|IsHoliday|
+-----+----+----------+------------+---------+
| null|null|      Date|        null|     null|
|    1|   1|05/02/2010|    24924.50|    false|
|    1|   1|12/02/2010|    46039.49|     true|
|    1|   1|19/02/2010|    41595.55|    false|
|    1|   1|26/02/2010|    19403.54|    false|
|    1|   1|05/03/2010|    21827.90|    false|
|    1|   1|12/03/2010|    21043.39|    false|
|    1|   1|19/03/2010|    22136.64|    false|
|    1|   1|26/03/2010|    26229.21|    false|
|    1|   1|02/04/2010|    57258.43|    false|
|    1|   1|09/04/2010|    42960.91|    false|
|    1|   1|16/04/2010|    17596.96|    false|
|    1|   1|23/04/2010|    16145.35|    false|
|    1|   1|30/04/2010|    16555.11|    false|
|    1|   1|07/05/2010|    17413.94|    false|
|    1|   1|14/05/2010|    18926.74|    false|
|    1|   1|21/05/2010|    14773.04|    false|
|    1|   1|28/05/2010|    15580.43|    false|
|    1|   1|0

In [83]:
#Inserta data de store.csv a tabla de staging
sqlContext.sql(f"LOAD DATA INPATH 'hdfs://localhost:9000/{path}/stores.csv' INTO TABLE stage.store;")
stage_store = sqlContext.table("stage.store")
stage_store.show()

+-----+----+------+
|Store|Type|  Size|
+-----+----+------+
| null|Type|  null|
|    1|   A|151315|
|    2|   A|202307|
|    3|   B| 37392|
|    4|   A|205863|
|    5|   B| 34875|
|    6|   A|202505|
|    7|   B| 70713|
|    8|   A|155078|
|    9|   B|125833|
|   10|   B|126512|
|   11|   A|207499|
|   12|   B|112238|
|   13|   A|219622|
|   14|   A|200898|
|   15|   B|123737|
|   16|   B| 57197|
|   17|   B| 93188|
|   18|   B|120653|
|   19|   A|203819|
+-----+----+------+
only showing top 20 rows



In [84]:
#Inserta data de features.csv a tabla de stagin
sqlContext.sql(f"LOAD DATA INPATH 'hdfs://localhost:9000/{path}/features.csv' INTO TABLE stage.features;")
stage_features = sqlContext.table("stage.features")
stage_features.show()

+-----+----------+-----------+----------+---------+---------+---------+---------+---------+------+------------+---------+
|Store|      Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|   CPI|Unemployment|IsHoliday|
+-----+----------+-----------+----------+---------+---------+---------+---------+---------+------+------------+---------+
| null|      Date|       null|      null|     null|     null|     null|     null|     null|  null|        null|     null|
|    1|05/02/2010|      42.31|      2.57|     null|     null|     null|     null|     null|211.10|        8.11|    false|
|    1|12/02/2010|      38.51|      2.55|     null|     null|     null|     null|     null|211.24|        8.11|     true|
|    1|19/02/2010|      39.93|      2.51|     null|     null|     null|     null|     null|211.29|        8.11|    false|
|    1|26/02/2010|      46.63|      2.56|     null|     null|     null|     null|     null|211.32|        8.11|    false|
|    1|05/03/2010|      

In [72]:
stage_sales = sqlContext.table("stage.sales")
#Quitar headers, bug conocido de Hive con PySpark
stage_sales = sqlContext.createDataFrame(stage_sales.tail(stage_sales.count()-1), stage_sales.schema)
#Convertir campo Date de String type a Date type
stage_sales = stage_sales.withColumn("Date", to_timestamp('Date', 'dd/MM/yyyy'))
stage_sales.show()
stage_sales.printSchema()

+-----+----+-------------------+------------+---------+
|Store|Dept|               Date|Weekly_Sales|IsHoliday|
+-----+----+-------------------+------------+---------+
|    1|   1|2010-02-05 00:00:00|    24924.50|    false|
|    1|   1|2010-02-12 00:00:00|    46039.49|     true|
|    1|   1|2010-02-19 00:00:00|    41595.55|    false|
|    1|   1|2010-02-26 00:00:00|    19403.54|    false|
|    1|   1|2010-03-05 00:00:00|    21827.90|    false|
|    1|   1|2010-03-12 00:00:00|    21043.39|    false|
|    1|   1|2010-03-19 00:00:00|    22136.64|    false|
|    1|   1|2010-03-26 00:00:00|    26229.21|    false|
|    1|   1|2010-04-02 00:00:00|    57258.43|    false|
|    1|   1|2010-04-09 00:00:00|    42960.91|    false|
|    1|   1|2010-04-16 00:00:00|    17596.96|    false|
|    1|   1|2010-04-23 00:00:00|    16145.35|    false|
|    1|   1|2010-04-30 00:00:00|    16555.11|    false|
|    1|   1|2010-05-07 00:00:00|    17413.94|    false|
|    1|   1|2010-05-14 00:00:00|    18926.74|   

In [91]:
#Insertar datos limpios a tabla final para "sales"
stage_sales.write.format("hive").mode("overwrite").saveAsTable("default.sales")
sales = sqlContext.table("sales")
sales.show()

+-----+----+-------------------+------------+---------+
|Store|Dept|               Date|Weekly_Sales|IsHoliday|
+-----+----+-------------------+------------+---------+
|    1|   1|2010-02-05 00:00:00|    24924.50|    false|
|    1|   1|2010-02-12 00:00:00|    46039.49|     true|
|    1|   1|2010-02-19 00:00:00|    41595.55|    false|
|    1|   1|2010-02-26 00:00:00|    19403.54|    false|
|    1|   1|2010-03-05 00:00:00|    21827.90|    false|
|    1|   1|2010-03-12 00:00:00|    21043.39|    false|
|    1|   1|2010-03-19 00:00:00|    22136.64|    false|
|    1|   1|2010-03-26 00:00:00|    26229.21|    false|
|    1|   1|2010-04-02 00:00:00|    57258.43|    false|
|    1|   1|2010-04-09 00:00:00|    42960.91|    false|
|    1|   1|2010-04-16 00:00:00|    17596.96|    false|
|    1|   1|2010-04-23 00:00:00|    16145.35|    false|
|    1|   1|2010-04-30 00:00:00|    16555.11|    false|
|    1|   1|2010-05-07 00:00:00|    17413.94|    false|
|    1|   1|2010-05-14 00:00:00|    18926.74|   

In [86]:
stage_store = sqlContext.table("stage.store")
#Quitar headers, bug conocido de Hive con PySpark
stage_store = sqlContext.createDataFrame(stage_store.tail(stage_store.count()-1), stage_store.schema)
stage_store.show()
stage_store.printSchema()

+-----+----+------+
|Store|Type|  Size|
+-----+----+------+
|    1|   A|151315|
|    2|   A|202307|
|    3|   B| 37392|
|    4|   A|205863|
|    5|   B| 34875|
|    6|   A|202505|
|    7|   B| 70713|
|    8|   A|155078|
|    9|   B|125833|
|   10|   B|126512|
|   11|   A|207499|
|   12|   B|112238|
|   13|   A|219622|
|   14|   A|200898|
|   15|   B|123737|
|   16|   B| 57197|
|   17|   B| 93188|
|   18|   B|120653|
|   19|   A|203819|
|   20|   A|203742|
+-----+----+------+
only showing top 20 rows

root
 |-- Store: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)



In [88]:
#Insertar datos limpios a tabla final para "store"
stage_store.write.format("hive").mode("overwrite").saveAsTable("default.store")
store = sqlContext.table("store")
store.show()

+-----+----+------+
|Store|Type|  Size|
+-----+----+------+
|    1|   A|151315|
|    2|   A|202307|
|    3|   B| 37392|
|    4|   A|205863|
|    5|   B| 34875|
|    6|   A|202505|
|    7|   B| 70713|
|    8|   A|155078|
|    9|   B|125833|
|   10|   B|126512|
|   11|   A|207499|
|   12|   B|112238|
|   13|   A|219622|
|   14|   A|200898|
|   15|   B|123737|
|   16|   B| 57197|
|   17|   B| 93188|
|   18|   B|120653|
|   19|   A|203819|
|   20|   A|203742|
+-----+----+------+
only showing top 20 rows



In [92]:
stage_features = sqlContext.table("stage.features")
#Quitar headers, bug conocido de Hive con PySpark
stage_features = sqlContext.createDataFrame(stage_features.tail(stage_features.count()-1), stage_features.schema)
#Convertir campo Date de String type a Date type
stage_features = stage_features.withColumn("Date", to_timestamp('Date', 'dd/MM/yyyy'))
stage_features.show()
stage_features.printSchema()

+-----+-------------------+-----------+----------+---------+---------+---------+---------+---------+------+------------+---------+
|Store|               Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|   CPI|Unemployment|IsHoliday|
+-----+-------------------+-----------+----------+---------+---------+---------+---------+---------+------+------------+---------+
|    1|2010-02-05 00:00:00|      42.31|      2.57|     null|     null|     null|     null|     null|211.10|        8.11|    false|
|    1|2010-02-12 00:00:00|      38.51|      2.55|     null|     null|     null|     null|     null|211.24|        8.11|     true|
|    1|2010-02-19 00:00:00|      39.93|      2.51|     null|     null|     null|     null|     null|211.29|        8.11|    false|
|    1|2010-02-26 00:00:00|      46.63|      2.56|     null|     null|     null|     null|     null|211.32|        8.11|    false|
|    1|2010-03-05 00:00:00|      46.50|      2.63|     null|     null|     null|   

In [95]:
#Insertar datos limpios a tabla final para "features"
stage_features.write.format("hive").mode("overwrite").saveAsTable("default.features")
features = sqlContext.table("features")
features.show()

+-----+-------------------+-----------+----------+---------+---------+---------+---------+---------+------+------------+---------+
|Store|               Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|   CPI|Unemployment|IsHoliday|
+-----+-------------------+-----------+----------+---------+---------+---------+---------+---------+------+------------+---------+
|    1|2010-02-05 00:00:00|      42.31|      2.57|     null|     null|     null|     null|     null|211.10|        8.11|    false|
|    1|2010-02-12 00:00:00|      38.51|      2.55|     null|     null|     null|     null|     null|211.24|        8.11|     true|
|    1|2010-02-19 00:00:00|      39.93|      2.51|     null|     null|     null|     null|     null|211.29|        8.11|    false|
|    1|2010-02-26 00:00:00|      46.63|      2.56|     null|     null|     null|     null|     null|211.32|        8.11|    false|
|    1|2010-03-05 00:00:00|      46.50|      2.63|     null|     null|     null|   

In [105]:
sqlContext.sql("SELECT COUNT(1) AS ConteoRegistrosSales FROM sales").show()
sqlContext.sql("SELECT COUNT(1) AS ConteoRegistrosStore FROM store").show()
sqlContext.sql("SELECT COUNT(1) AS ConteoRegistrosFeatures FROM features").show()

+--------------------+
|ConteoRegistrosSales|
+--------------------+
|              421570|
+--------------------+

+--------------------+
|ConteoRegistrosStore|
+--------------------+
|                  45|
+--------------------+

+-----------------------+
|ConteoRegistrosFeatures|
+-----------------------+
|                   8190|
+-----------------------+

