<a href="https://colab.research.google.com/github/kieutrinhtran/BigData-Homework/blob/main/SparkSQL_SQLContext_HiveContext_31221025966.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# SparkSQL_SQLContext_HiveContext

In [1]:
from datetime import datetime
import pytz
print('Tested',datetime.now(pytz.timezone('Asia/Ho_Chi_Minh')))

Tested 2025-04-17 14:21:36.848315+07:00


# SPARK SQL
If you wish to run HIVE natively under Hadoop please see this notebook [Hadoop and Hive](https://github.com/prithwis/KKolab/blob/main/KK_B2_Hadoop_and_Hive.ipynb)

# Install Spark

## PIP Install Spark

In [2]:
#!pip3 -q install pyspark==3.0.3    # Problem with newer spark versions resolved, no need to install older version
!pip3 -q install pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sc

# Load Data

In [3]:
# Data files downloaded from author's G-Drive
#!gdown https://drive.google.com/uc?id=1JJH24ZZaiJrEKValD--UtyFcWl7UanwV  # 2% data
#!wget -q 'https://raw.githubusercontent.com/Praxis-QR/BDSN/main/data/OLAP/eCommerce_02PC_2021.csv' # 2% data

#!gdown https://drive.google.com/uc?id=1g7mJ0v4fkERW0HWc1eq-SHs_jvQ0N2Oe  # 100% data
!wget -q 'https://raw.githubusercontent.com/Praxis-QR/BDSN/main/data/OLAP/eCommerce_Full_2021.csv' # 100% data

!ls -al

total 44000
drwxr-xr-x 1 root root     4096 Apr 18 11:13 .
drwxr-xr-x 1 root root     4096 Apr 18 11:12 ..
drwxr-xr-x 4 root root     4096 Apr 16 13:40 .config
-rw-r--r-- 1 root root 45038728 Apr 18 11:13 eCommerce_Full_2021.csv
drwxr-xr-x 1 root root     4096 Apr 16 13:40 sample_data


In [4]:
#we remove the CRLF character from the end of the row if it exists
!sed 's/\r//' /content/eCommerce_Full_2021.csv > datafile.csv
#!sed 's/\r//' /content/eCommerce_02PC_2021.csv > datafile.csv


#sed -i -e "1d" datafile.csv               # remove the first line containing headers from the file

data_file = 'datafile.csv'
raw_data = sc.textFile(data_file)
print ("Data Size", raw_data.count())

Data Size 541910


#Spark Data Frame

## Import dữ liệu vào Spark và xem cấu trúc dữ liệu

In [5]:
# CSV to Spark Dataframe
eCommerce_df = spark.read.csv(data_file,inferSchema=True, header=True)
eCommerce_df.printSchema()          # Spark is smart enough to differentiate between text and numbers
                                    # Presence of comma in string fields has been handled correctly

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



## Đếm số giao dịch ở mỗi quốc gia, sắp xếp giảm dần theo số lượng giao dịch

In [6]:
eCommerce_df.select('Quantity','UnitPrice','Country').groupby(eCommerce_df.Country).count().sort('count', ascending = False).show()

+---------------+------+
|        Country| count|
+---------------+------+
| United Kingdom|495478|
|        Germany|  9495|
|         France|  8557|
|           EIRE|  8196|
|          Spain|  2533|
|    Netherlands|  2371|
|        Belgium|  2069|
|    Switzerland|  2002|
|       Portugal|  1519|
|      Australia|  1259|
|         Norway|  1086|
|          Italy|   803|
|Channel Islands|   758|
|        Finland|   695|
|         Cyprus|   622|
|         Sweden|   462|
|    Unspecified|   446|
|        Austria|   401|
|        Denmark|   389|
|          Japan|   358|
+---------------+------+
only showing top 20 rows



## Hiển thị 10 khách hàng mua với doanh số cao nhất, sắp xếp giảm dần theo doanh số

In [9]:
eCommerce_df = eCommerce_df.withColumn("TotalPrice", eCommerce_df["Quantity"] * eCommerce_df["UnitPrice"])

In [10]:
# Nhóm theo CustomerID và tính tổng TotalPrice, sắp xếp giảm dần, lấy top 10
eCommerce_df.groupBy("CustomerID") \
    .sum("TotalPrice") \
    .withColumnRenamed("sum(TotalPrice)", "TotalRevenue") \
    .orderBy("TotalRevenue", ascending=False) \
    .show(10)

+----------+------------------+
|CustomerID|      TotalRevenue|
+----------+------------------+
|      NULL|  1447682.12000015|
|     14646|279489.01999999944|
|     18102|256438.48999999993|
|     17450|187482.16999999998|
|     14911|132572.61999999994|
|     12415| 123725.4499999999|
|     14156|113384.13999999998|
|     17511| 88125.37999999992|
|     16684| 65892.07999999999|
|     13694|62653.100000000006|
+----------+------------------+
only showing top 10 rows



In [11]:
# xem các dòng có customerid null

from pyspark.sql.functions import col

# Filter rows where CustomerID is null
eCommerce_df.filter(col("CustomerID").isNull()).show()


+---------+---------+--------------------+--------+---------------+---------+----------+--------------+------------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|        TotalPrice|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+------------------+
|   536414|    22139|                NULL|      56|12/1/2010 11:52|      0.0|      NULL|United Kingdom|               0.0|
|   536544|    21773|DECORATIVE ROSE B...|       1|12/1/2010 14:32|     2.51|      NULL|United Kingdom|              2.51|
|   536544|    21774|DECORATIVE CATS B...|       2|12/1/2010 14:32|     2.51|      NULL|United Kingdom|              5.02|
|   536544|    21786|  POLKADOT RAIN HAT |       4|12/1/2010 14:32|     0.85|      NULL|United Kingdom|               3.4|
|   536544|    21787|RAIN PONCHO RETRO...|       2|12/1/2010 14:32|     1.66|      NULL|United Kingdom|              3.32|
|   536544|    2

## Hiển thị doanh thu theo ngày và sắp xếp ngày tăng dần

In [12]:
#đổi tên cột thành InvoiceDateTime
eCommerce_df = eCommerce_df.withColumnRenamed("InvoiceDate", "InvoiceDateTime")

In [18]:
from pyspark.sql.functions import to_date
#thêm cột mới InvoiceDate chỉ chứa date với định dạng yyyy-mm-dd
eCommerce_df = eCommerce_df.withColumn("InvoiceDate", to_date("InvoiceDateTime", "M/d/yyyy H:m"))


In [20]:
eCommerce_df.select("InvoiceDate").show(10)

+-----------+
|InvoiceDate|
+-----------+
| 2010-12-01|
| 2010-12-01|
| 2010-12-01|
| 2010-12-01|
| 2010-12-01|
| 2010-12-01|
| 2010-12-01|
| 2010-12-01|
| 2010-12-01|
| 2010-12-01|
+-----------+
only showing top 10 rows



In [22]:
#xem cấu trúc bảng
eCommerce_df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDateTime: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- TotalPrice: double (nullable = true)
 |-- InvoiceDate: date (nullable = true)



In [26]:
# Nhóm theo ngày, tính tổng doanh thu, sắp xếp ngày tăng dần
eCommerce_df.groupBy("InvoiceDate") \
    .sum("TotalPrice") \
    .withColumnRenamed("sum(TotalPrice)", "DailyRevenue") \
    .orderBy("InvoiceDate", ascending=True) \
    .show()

+-----------+------------------+
|InvoiceDate|      DailyRevenue|
+-----------+------------------+
| 2010-12-01| 58635.56000000026|
| 2010-12-02| 46207.27999999991|
| 2010-12-03|  45620.4599999999|
| 2010-12-05| 31383.95000000016|
| 2010-12-06|53860.180000000015|
| 2010-12-07| 45059.05000000015|
| 2010-12-08|44189.839999999866|
| 2010-12-09| 52532.13000000003|
| 2010-12-10| 57404.91000000017|
| 2010-12-12| 17240.92000000005|
| 2010-12-13| 35379.34000000009|
| 2010-12-14|42843.290000000205|
| 2010-12-15| 29443.69000000013|
| 2010-12-16| 48334.34999999993|
| 2010-12-17| 43534.19000000001|
| 2010-12-19| 7517.309999999992|
| 2010-12-20|24741.750000000015|
| 2010-12-21|47097.939999999915|
| 2010-12-22| 6134.569999999999|
| 2010-12-23|11796.310000000025|
+-----------+------------------+
only showing top 20 rows



# Spark SQL
What is the difference between SQLContext and HiveContext? See [here](https://intellipaat.com/community/7599/what-is-the-difference-between-apache-spark-sqlcontext-vs-hivecontext#:~:text=HiveContext%20is%20a%20super%20set,read%20data%20from%20Hive%20tables.&text=The%20more%20basic%20SQLContext%20provides,does%20not%20depend%20on%20Hive.), or [here](https://stackoverflow.com/questions/33666545/what-is-the-difference-between-apache-spark-sqlcontext-vs-hivecontext)

### Tạo bảng tạm t_eCommerce

In [27]:
eCommerce_df.createOrReplaceTempView("t_eCommerce")                    # Create a Spark Table

### Đếm số giao dịch ở mỗi quốc gia và sắp xếp giảm dần

In [28]:
spark.sql("SELECT Country, count(*) count FROM t_eCommerce group by Country order by count desc").show()

+---------------+------+
|        Country| count|
+---------------+------+
| United Kingdom|495478|
|        Germany|  9495|
|         France|  8557|
|           EIRE|  8196|
|          Spain|  2533|
|    Netherlands|  2371|
|        Belgium|  2069|
|    Switzerland|  2002|
|       Portugal|  1519|
|      Australia|  1259|
|         Norway|  1086|
|          Italy|   803|
|Channel Islands|   758|
|        Finland|   695|
|         Cyprus|   622|
|         Sweden|   462|
|    Unspecified|   446|
|        Austria|   401|
|        Denmark|   389|
|          Japan|   358|
+---------------+------+
only showing top 20 rows



### Tổng doanh thu theo quốc gia

In [29]:
spark.sql("SELECT Country, sum(Quantity * UnitPrice) as DoanhThu FROM t_eCommerce group by Country order by DoanhThu desc").show()

+---------------+------------------+
|        Country|          DoanhThu|
+---------------+------------------+
| United Kingdom|8187806.3639986925|
|    Netherlands| 284661.5399999992|
|           EIRE|263276.81999999913|
|        Germany|221698.21000000037|
|         France|197403.90000000037|
|      Australia|137077.26999999987|
|    Switzerland| 56385.35000000011|
|          Spain| 54774.58000000016|
|        Belgium|40910.960000000014|
|         Sweden| 36595.90999999998|
|          Japan|35340.619999999995|
|         Norway| 35163.46000000001|
|       Portugal|29367.019999999953|
|        Finland|22326.739999999994|
|Channel Islands| 20086.28999999999|
|        Denmark| 18768.13999999999|
|          Italy|16890.509999999995|
|         Cyprus|12946.289999999994|
|        Austria|10154.319999999996|
|      Hong Kong|10117.039999999997|
+---------------+------------------+
only showing top 20 rows



In [30]:
#Countries = spark.sql("SELECT * FROM t_eCommerce limit 5")
Countries = spark.sql("SELECT * FROM t_eCommerce")
Countries.show(5, truncate=False)

+---------+---------+-----------------------------------+--------+---------------+---------+----------+--------------+------------------+-----------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDateTime|UnitPrice|CustomerID|Country       |TotalPrice        |InvoiceDate|
+---------+---------+-----------------------------------+--------+---------------+---------+----------+--------------+------------------+-----------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |12/1/2010 8:26 |2.55     |17850     |United Kingdom|15.299999999999999|2010-12-01 |
|536365   |71053    |WHITE METAL LANTERN                |6       |12/1/2010 8:26 |3.39     |17850     |United Kingdom|20.34             |2010-12-01 |
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |12/1/2010 8:26 |2.75     |17850     |United Kingdom|22.0              |2010-12-01 |
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |12/1/2010 8:26 |3.39     |17850   

In [31]:
spark.sql("select distinct Country from t_eCommerce").show()

+------------------+
|           Country|
+------------------+
|            Sweden|
|         Singapore|
|           Germany|
|            France|
|            Greece|
|European Community|
|           Belgium|
|           Finland|
|             Malta|
|       Unspecified|
|             Italy|
|              EIRE|
|         Lithuania|
|            Norway|
|             Spain|
|           Denmark|
|         Hong Kong|
|           Iceland|
|            Israel|
|   Channel Islands|
+------------------+
only showing top 20 rows



In [32]:
spark.sql("select count(distinct Country) CountryCount from t_eCommerce").show()

+------------+
|CountryCount|
+------------+
|          38|
+------------+



In [33]:
#Countries = sqlContext.sql("SELECT sum(Quantity), sum(UnitPrice), Country from eCommerce group by Country order by sum(Quantity) desc")
spark.sql("SELECT sum(Quantity), sum(UnitPrice), Country from t_eCommerce group by Country order by sum(Quantity) desc").show()

+-------------+------------------+---------------+
|sum(Quantity)|    sum(UnitPrice)|        Country|
+-------------+------------------+---------------+
|      4263829| 2245715.473997657| United Kingdom|
|       200128| 6492.549999999972|    Netherlands|
|       142637|48447.190000000934|           EIRE|
|       117448| 37666.00000000061|        Germany|
|       110480| 43031.99000000053|         France|
|        83653| 4054.749999999998|      Australia|
|        35637|1806.8300000000006|         Sweden|
|        30325| 6813.689999999986|    Switzerland|
|        26824| 12633.44999999992|          Spain|
|        25218| 814.8600000000002|          Japan|
|        23152| 7540.129999999979|        Belgium|
|        19247| 6529.059999999987|         Norway|
|        16180|          13037.54|       Portugal|
|        10666| 3786.850000000005|        Finland|
|         9479|3738.5500000000015|Channel Islands|
|         8188|1266.9499999999991|        Denmark|
|         7999| 3879.3900000000

In [34]:
#Countries = sqlContext.sql("SELECT sum(Quantity), sum(UnitPrice), Country from eCommerce group by Country order by sum(Quantity) desc")
spark.sql("SELECT sum(Quantity) Quantity, round(sum(UnitPrice)) xSumUnitPrice, Country from t_eCommerce group by Country order by Quantity desc").show(40)

+--------+-------------+--------------------+
|Quantity|xSumUnitPrice|             Country|
+--------+-------------+--------------------+
| 4263829|    2245715.0|      United Kingdom|
|  200128|       6493.0|         Netherlands|
|  142637|      48447.0|                EIRE|
|  117448|      37666.0|             Germany|
|  110480|      43032.0|              France|
|   83653|       4055.0|           Australia|
|   35637|       1807.0|              Sweden|
|   30325|       6814.0|         Switzerland|
|   26824|      12633.0|               Spain|
|   25218|        815.0|               Japan|
|   23152|       7540.0|             Belgium|
|   19247|       6529.0|              Norway|
|   16180|      13038.0|            Portugal|
|   10666|       3787.0|             Finland|
|    9479|       3739.0|     Channel Islands|
|    8188|       1267.0|             Denmark|
|    7999|       3879.0|               Italy|
|    6317|       3920.0|              Cyprus|
|    5234|      25109.0|          

In [35]:
spark.sql("SELECT sum(Quantity) Quantity, round(sum(UnitPrice * Quantity)) Sales, Country from t_eCommerce group by Country order by Quantity desc").show(40)

+--------+---------+--------------------+
|Quantity|    Sales|             Country|
+--------+---------+--------------------+
| 4263829|8187806.0|      United Kingdom|
|  200128| 284662.0|         Netherlands|
|  142637| 263277.0|                EIRE|
|  117448| 221698.0|             Germany|
|  110480| 197404.0|              France|
|   83653| 137077.0|           Australia|
|   35637|  36596.0|              Sweden|
|   30325|  56385.0|         Switzerland|
|   26824|  54775.0|               Spain|
|   25218|  35341.0|               Japan|
|   23152|  40911.0|             Belgium|
|   19247|  35163.0|              Norway|
|   16180|  29367.0|            Portugal|
|   10666|  22327.0|             Finland|
|    9479|  20086.0|     Channel Islands|
|    8188|  18768.0|             Denmark|
|    7999|  16891.0|               Italy|
|    6317|  12946.0|              Cyprus|
|    5234|   9120.0|           Singapore|
|    4827|  10154.0|             Austria|
|    4769|  10117.0|           Hon

In [36]:
# Where description contains comma
spark.sql("SELECT InvoiceNo, Description  from t_eCommerce where Description like '%,%'").show(truncate = False)

+---------+----------------------------------+
|InvoiceNo|Description                       |
+---------+----------------------------------+
|536381   |AIRLINE LOUNGE,METAL SIGN         |
|536394   |FANCY FONT BIRTHDAY CARD,         |
|536520   |TRAY, BREAKFAST IN BED            |
|536520   |SWISS ROLL TOWEL, CHOCOLATE  SPOTS|
|536524   |SWISS ROLL TOWEL, CHOCOLATE  SPOTS|
|536531   |BIRTHDAY CARD, RETRO SPOT         |
|536531   |FANCY FONT BIRTHDAY CARD,         |
|536532   |FANCY FONT BIRTHDAY CARD,         |
|536544   |TRAY, BREAKFAST IN BED            |
|536544   |ACRYLIC JEWEL ICICLE, PINK        |
|536544   |TUMBLER, BAROQUE                  |
|536544   |TUMBLER, NEW ENGLAND              |
|536544   |METAL SIGN,CUPCAKE SINGLE HOOK    |
|536544   |METAL SIGN,CUPCAKE SINGLE HOOK    |
|536544   |SET 3 RETROSPOT TEA,COFFEE,SUGAR  |
|536544   |ELEPHANT, BIRTHDAY CARD,          |
|C536548  |HOOK, 1 HANGER ,MAGIC GARDEN      |
|536559   |FEATHER PEN,COAL BLACK            |
|536559   |FE

In [37]:
spark.sql("select count(*) from t_eCommerce where Description like '%,%'").show()

+--------+
|count(1)|
+--------+
|    4796|
+--------+



## Hiển thị 10 khách hàng mua với doanh số cao nhất, sắp xếp giảm dần theo doanh số

In [38]:
spark.sql("""
    SELECT CustomerID, SUM(TotalPrice) AS TotalRevenue
    FROM t_eCommerce
    GROUP BY CustomerID
    ORDER BY TotalRevenue DESC
    LIMIT 10
""").show()


+----------+------------------+
|CustomerID|      TotalRevenue|
+----------+------------------+
|      NULL|  1447682.12000015|
|     14646|279489.01999999944|
|     18102|256438.48999999993|
|     17450|187482.16999999998|
|     14911|132572.61999999994|
|     12415| 123725.4499999999|
|     14156|113384.13999999998|
|     17511| 88125.37999999992|
|     16684| 65892.07999999999|
|     13694|62653.100000000006|
+----------+------------------+



## Hiển thị doanh thu theo ngày và sắp xếp ngày tăng dần

In [39]:
spark.sql("""
    SELECT InvoiceDate, SUM(TotalPrice) AS DailyRevenue
    FROM t_eCommerce
    GROUP BY InvoiceDate
    ORDER BY InvoiceDate ASC
""").show()

+-----------+------------------+
|InvoiceDate|      DailyRevenue|
+-----------+------------------+
| 2010-12-01| 58635.56000000026|
| 2010-12-02| 46207.27999999991|
| 2010-12-03|  45620.4599999999|
| 2010-12-05| 31383.95000000016|
| 2010-12-06|53860.180000000015|
| 2010-12-07| 45059.05000000015|
| 2010-12-08|44189.839999999866|
| 2010-12-09| 52532.13000000003|
| 2010-12-10| 57404.91000000017|
| 2010-12-12| 17240.92000000005|
| 2010-12-13| 35379.34000000009|
| 2010-12-14|42843.290000000205|
| 2010-12-15| 29443.69000000013|
| 2010-12-16| 48334.34999999993|
| 2010-12-17| 43534.19000000001|
| 2010-12-19| 7517.309999999992|
| 2010-12-20|24741.750000000015|
| 2010-12-21|47097.939999999915|
| 2010-12-22| 6134.569999999999|
| 2010-12-23|11796.310000000025|
+-----------+------------------+
only showing top 20 rows



# Data Cleaning

## Remove comma from data of CSV file <br>
The data contains commas which may cause during the load process

In [40]:
#we remove the CRLF character from the end of the row if it exists
!sed 's/\r//' /content/eCommerce_Full_2021.csv > datafile.csv
#!sed 's/\r//g' /content/eCommerce_02PC_2021.csv > datafile.csv
# remove the first line containing headers from the file
!sed -i -e "1d" datafile.csv

Tropical beach, coffee has commas

In [44]:
#!grep 'TROPICAL BEACH' datafile.csv
#!grep 'COFFEE,' datafile.csv
!head datafile.csv

536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/2010 8:26,2.75,17850,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/2010 8:26,3.39,17850,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/2010 8:26,3.39,17850,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,12/1/2010 8:26,7.65,17850,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,12/1/2010 8:26,4.25,17850,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,12/1/2010 8:28,1.85,17850,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,12/1/2010 8:28,1.85,17850,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,12/1/2010 8:34,1.69,13047,United Kingdom


commas getting removed here

In [45]:
import pandas as pd
#p_df = pd.read_csv('datafile.csv',header=None)   # throws errors because of non utf-encoding
                                                  # https://stackoverflow.com/questions/18171739/unicodedecodeerror-when-reading-csv-file-in-pandas-with-python
p_df = pd.read_csv('datafile.csv',encoding = "ISO-8859-1",header=None)
#p_df = pd.read_csv('datafile.csv',encoding = "latin",header=None)
#p_df = pd.read_csv('datafile.csv',encoding = "latin")
p_df2 = p_df.replace(',', ' ', regex=True)
p_df2.to_csv('datafile2.csv',index=False,sep=',',header=False)
p_df2.shape

(541909, 8)

In [None]:
#Desc2 = p_df2['Description'].str.encode('ascii','ignore').str.decode('ascii')
#p_df2['Description2'] = Desc2
#p_df2.head(3)
#p_df2.drop("Description", axis=1, inplace=True)
#p_df2.head(3)

In [None]:
#!head datafile2.csv
#!grep 'TROPICAL BEACH' datafile2.csv
#!grep 'COFFEE' datafile2.csv

##SQL Context

In [46]:
#from pyspark.sql import SQLContext
from pyspark.sql import Row

#sqlContext = SQLContext(sc)

In [47]:
data_file = "datafile2.csv"
raw_data = sc.textFile(data_file)
csv_data = raw_data.map(lambda l: l.split(","))

In [48]:
csv_data.take(5)

[['536365',
  '85123A',
  'WHITE HANGING HEART T-LIGHT HOLDER',
  '6',
  '12/1/2010 8:26',
  '2.55',
  '17850.0',
  'United Kingdom'],
 ['536365',
  '71053',
  'WHITE METAL LANTERN',
  '6',
  '12/1/2010 8:26',
  '3.39',
  '17850.0',
  'United Kingdom'],
 ['536365',
  '84406B',
  'CREAM CUPID HEARTS COAT HANGER',
  '8',
  '12/1/2010 8:26',
  '2.75',
  '17850.0',
  'United Kingdom'],
 ['536365',
  '84029G',
  'KNITTED UNION FLAG HOT WATER BOTTLE',
  '6',
  '12/1/2010 8:26',
  '3.39',
  '17850.0',
  'United Kingdom'],
 ['536365',
  '84029E',
  'RED WOOLLY HOTTIE WHITE HEART.',
  '6',
  '12/1/2010 8:26',
  '3.39',
  '17850.0',
  'United Kingdom']]

In [49]:
row_data = csv_data.map(lambda p: Row(
    InvoiceNo=p[0],
    StockCode=p[1],
    Description=p[2],
    Quantity=int(p[3]),
    InvoiceDate=p[4],
    UnitPrice=float(p[5]),
    CustomerID=p[6],
    Country=p[7]
    )
)

In [50]:
row_data.take(2)

[Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, InvoiceDate='12/1/2010 8:26', UnitPrice=2.55, CustomerID='17850.0', Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='71053', Description='WHITE METAL LANTERN', Quantity=6, InvoiceDate='12/1/2010 8:26', UnitPrice=3.39, CustomerID='17850.0', Country='United Kingdom')]

In [51]:
#eCommerce2_df = sqlContext.createDataFrame(row_data)
#eCommerce2_df.registerTempTable("eCommerce")
# spark 2 onwards ...
eCommerce2_df = spark.createDataFrame(row_data)
eCommerce2_df.createOrReplaceTempView("eCommerce")

In [52]:
#Countries = sqlContext.sql("SELECT * FROM eCommerce limit 20")
Countries = spark.sql("SELECT * FROM eCommerce limit 5")
Countries.show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+



In [53]:
#Countries = sqlContext.sql("SELECT sum(Quantity), sum(UnitPrice), Country from eCommerce group by Country order by sum(Quantity) desc")
Countries = spark.sql("SELECT sum(Quantity), sum(UnitPrice), Country from eCommerce group by Country order by sum(Quantity) desc")
Countries.show()

+-------------+------------------+---------------+
|sum(Quantity)|    sum(UnitPrice)|        Country|
+-------------+------------------+---------------+
|      4263829|2245715.4739978826| United Kingdom|
|       200128| 6492.549999999974|    Netherlands|
|       142637| 48447.19000000096|           EIRE|
|       117448|  37666.0000000006|        Germany|
|       110480| 43031.99000000051|         France|
|        83653| 4054.749999999998|      Australia|
|        35637|           1806.83|         Sweden|
|        30325| 6813.689999999986|    Switzerland|
|        26824|12633.449999999923|          Spain|
|        25218| 814.8600000000001|          Japan|
|        23152| 7540.129999999981|        Belgium|
|        19247| 6529.059999999985|         Norway|
|        16180|          13037.54|       Portugal|
|        10666| 3786.850000000004|        Finland|
|         9479| 3738.550000000003|Channel Islands|
|         8188|1266.9499999999998|        Denmark|
|         7999| 3879.3900000000

### Why is there a difference from t_eCommerce Data?

In [54]:
#Countries = sqlContext.sql("SELECT sum(Quantity), sum(UnitPrice), Country from eCommerce group by Country order by sum(Quantity) desc")
spark.sql("SELECT sum(Quantity), sum(UnitPrice), Country from t_eCommerce group by Country order by sum(Quantity) desc").show()

+-------------+------------------+---------------+
|sum(Quantity)|    sum(UnitPrice)|        Country|
+-------------+------------------+---------------+
|      4263823|2245712.9239976574| United Kingdom|
|       200128| 6492.549999999972|    Netherlands|
|       142637|48447.190000000934|           EIRE|
|       117448| 37666.00000000061|        Germany|
|       110480| 43031.99000000053|         France|
|        83653| 4054.749999999998|      Australia|
|        35637|1806.8300000000006|         Sweden|
|        30325| 6813.689999999986|    Switzerland|
|        26824| 12633.44999999992|          Spain|
|        25218| 814.8600000000002|          Japan|
|        23152| 7540.129999999979|        Belgium|
|        19247| 6529.059999999987|         Norway|
|        16180|          13037.54|       Portugal|
|        10666| 3786.850000000005|        Finland|
|         9479|3738.5500000000015|Channel Islands|
|         8188|1266.9499999999991|        Denmark|
|         7999| 3879.3900000000

| Nguyên nhân chính             | Giải thích |
|------------------------------|---------------------|
| **Schema khác nhau**         | Khi sử dụng DataFrame với `inferSchema=True`, Spark tự động suy đoán kiểu dữ liệu phù hợp (integer, double, string, etc.). Trong khi đó, RDD chỉ xử lý dưới dạng chuỗi (`string`) nếu không ép kiểu thủ công. |
| **Không xử lý header**       | DataFrame có thể đọc và bỏ qua dòng tiêu đề (`header=True`), còn RDD sẽ tính cả dòng header như một bản ghi dữ liệu nếu không xử lý riêng. |
| **Tách cột không chính xác** | RDD thường dùng `split(",")`. Nếu dữ liệu chứa dấu phẩy trong chuỗi (ví dụ `"Nice, white cup"`), thì việc tách sẽ sai cột. Trong khi đó, DataFrame sử dụng parser CSV chuẩn để xử lý các chuỗi phức tạp đúng cách. |
| **Không có metadata schema** | DataFrame lưu schema kèm theo DataFrame, giúp xử lý dễ dàng hơn (tên cột, kiểu dữ liệu). RDD không có thông tin schema trừ khi tự tạo bằng `Row` và `StructType`. |
| **Khả năng tối ưu hóa**      | Spark SQL (dùng với DataFrame) có thể tối ưu truy vấn qua Catalyst Optimizer, còn xử lý RDD là thủ công và ít tối ưu hơn. |


## HIVE context
this part adapted from [this notebook](https://colab.research.google.com/github/jmbanda/BigDataProgramming_2019/blob/master/Chapter_5_Loading_and_Saving_Data_in_Spark.ipynb#scrollTo=lOcCViaN4cwC)

In [55]:
from pyspark.sql import Row
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)




### Import dữ liệu vào Hive

In [56]:
data_file = "datafile2.csv"
raw_data = sc.textFile(data_file)
csv_data = raw_data.map(lambda l: l.split(","))

In [57]:
row_data = csv_data.map(lambda p: Row(
    InvoiceNo=p[0],
    StockCode=p[1],
    Description=p[2],
    Quantity=int(p[3]),
    InvoiceDate=p[4],
    UnitPrice=float(p[5]),
    CustomerID=p[6],
    Country=p[7]
    )
)

In [58]:
ec_schema = spark.createDataFrame(row_data)
# Register it as a temp table
sqlContext.registerDataFrameAsTable(ec_schema, "ec_HiveTable")
sqlContext.sql("show tables").show()

+---------+------------+-----------+
|namespace|   tableName|isTemporary|
+---------+------------+-----------+
|         |ec_hivetable|       true|
|         |   ecommerce|       true|
|         | t_ecommerce|       true|
+---------+------------+-----------+



### Thực hiện Query trong Hive

In [59]:
Countries = sqlContext.sql("SELECT * FROM ec_HiveTable limit 20")
Countries.show(truncate=False)

+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |12/1/2010 8:26|2.55     |17850.0   |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |12/1/2010 8:26|3.39     |17850.0   |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |12/1/2010 8:26|2.75     |17850.0   |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |12/1/2010 8:26|3.39     |17850.0   |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |12/1/2010 8:26|3.39     |17850.0   |United Kingdom|
|536365   |22752    |SET 7 BABUSHKA NESTING BOXES       

In [60]:
Countries = sqlContext.sql("SELECT sum(Quantity) Quantity, round(sum(UnitPrice * Quantity)) Sales, Country from eCommerce group by Country order by Quantity desc")
Countries.show()

+--------+---------+---------------+
|Quantity|    Sales|        Country|
+--------+---------+---------------+
| 4263829|8187806.0| United Kingdom|
|  200128| 284662.0|    Netherlands|
|  142637| 263277.0|           EIRE|
|  117448| 221698.0|        Germany|
|  110480| 197404.0|         France|
|   83653| 137077.0|      Australia|
|   35637|  36596.0|         Sweden|
|   30325|  56385.0|    Switzerland|
|   26824|  54775.0|          Spain|
|   25218|  35341.0|          Japan|
|   23152|  40911.0|        Belgium|
|   19247|  35163.0|         Norway|
|   16180|  29367.0|       Portugal|
|   10666|  22327.0|        Finland|
|    9479|  20086.0|Channel Islands|
|    8188|  18768.0|        Denmark|
|    7999|  16891.0|          Italy|
|    6317|  12946.0|         Cyprus|
|    5234|   9120.0|      Singapore|
|    4827|  10154.0|        Austria|
+--------+---------+---------------+
only showing top 20 rows



In [61]:
from datetime import datetime
import pytz
print('signed off at  ',datetime.now(pytz.timezone('Asia/Ho_Chi_Minh')))

signed off at   2025-04-18 18:35:09.660973+07:00
