In [4]:
from datetime import datetime

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window

In [3]:
spark = SparkSession.builder.appName("101-pyspark_Testing".format(datetime.today())).master("local[*]").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/15 00:26:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
URL = "/home/longnguyen/Documents/Coding/Workshop-AWS/batch-processing-with-amz-emr/data/OnlineRetail.csv"

In [5]:
df = spark.read.csv(URL, header=True, inferSchema=True)

                                                                                

In [6]:
df.show(10)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.

In [7]:
# Rename Columns
col_renames = {'InvoiceNo': 'OrderID', 'StockCode': 'ProductID', 'InvoiceDate': 'OrderDate'}

for old_name, new_name in col_renames.items():
    df = df.withColumnRenamed(old_name, new_name)
    
df.show(20)

+-------+---------+--------------------+--------+--------------+---------+----------+--------------+
|OrderID|ProductID|         Description|Quantity|     OrderDate|UnitPrice|CustomerID|       Country|
+-------+---------+--------------------+--------+--------------+---------+----------+--------------+
| 536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
| 536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
| 536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
| 536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
| 536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
| 536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
| 536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.25|     17850|United

In [8]:
df.printSchema()

root
 |-- OrderID: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [9]:
# Change data type for col OrderDate
DATE_FORMAT = ["M/d/yyyy H:mm", "M/d/yyyy H:mm", "M/d/yyyy H:mm"]

# Thử cả hai định dạng và chọn cái hợp lệ đầu tiên
df = df.withColumn("OrderDate", F.trim(F.col("OrderDate")))  # Xóa các khoảng trắng

df = df.withColumn(
    "OrderDate",
    F.coalesce(
        F.to_timestamp(F.col("OrderDate"), DATE_FORMAT[0]),
        F.to_timestamp(F.col("OrderDate"), DATE_FORMAT[1]),
        F.to_timestamp(F.col("OrderDate"), DATE_FORMAT[2])
    )
)

In [10]:
missing_count_expr = [F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns]
missing_values_df = df.agg(*missing_count_expr)
missing_values_df.show()

[Stage 4:>                                                        (0 + 11) / 11]

+-------+---------+-----------+--------+---------+---------+----------+-------+
|OrderID|ProductID|Description|Quantity|OrderDate|UnitPrice|CustomerID|Country|
+-------+---------+-----------+--------+---------+---------+----------+-------+
|      0|        0|       1454|       0|        0|        0|    135080|      0|
+-------+---------+-----------+--------+---------+---------+----------+-------+



                                                                                

In [11]:
df.printSchema()

root
 |-- OrderID: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- OrderDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [12]:
df.show(10)

+-------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|OrderID|ProductID|         Description|Quantity|          OrderDate|UnitPrice|CustomerID|       Country|
+-------+---------+--------------------+--------+-------------------+---------+----------+--------------+
| 536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|
| 536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
| 536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|
| 536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
| 536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
| 536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65|     17850|United Kingdom|
| 536365|    21730|GLASS STAR FROSTE...|      

In [13]:
orderDate_unique = df.select("OrderDate").distinct()
orderDate_unique.show(10)

[Stage 8:>                                                        (0 + 11) / 11]

+-------------------+
|          OrderDate|
+-------------------+
|2010-12-02 16:42:00|
|2010-12-03 14:27:00|
|2010-12-05 13:26:00|
|2010-12-06 14:46:00|
|2010-12-07 15:54:00|
|2010-12-15 09:07:00|
|2010-12-16 12:23:00|
|2010-12-21 12:46:00|
|2010-12-02 11:20:00|
|2010-12-05 12:57:00|
+-------------------+
only showing top 10 rows



                                                                                

In [14]:
# Feature Engineering
df = df.withColumn("Day", F.dayofmonth("OrderDate"))
df = df.withColumn("Month", F.month("OrderDate"))
df = df.withColumn("Year", F.year("OrderDate"))

In [15]:
df.show(5)

+-------+---------+--------------------+--------+-------------------+---------+----------+--------------+---+-----+----+
|OrderID|ProductID|         Description|Quantity|          OrderDate|UnitPrice|CustomerID|       Country|Day|Month|Year|
+-------+---------+--------------------+--------+-------------------+---------+----------+--------------+---+-----+----+
| 536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|  1|   12|2010|
| 536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|  1|   12|2010|
| 536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|  1|   12|2010|
| 536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|  1|   12|2010|
| 536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|  1|   12|2010|
+-------+---------+-------------

In [16]:
TIME_FORMAT = "HH:mm"
df = df.withColumn("HourMinute", F.date_format("OrderDate", TIME_FORMAT))

In [17]:
df.show(5)

+-------+---------+--------------------+--------+-------------------+---------+----------+--------------+---+-----+----+----------+
|OrderID|ProductID|         Description|Quantity|          OrderDate|UnitPrice|CustomerID|       Country|Day|Month|Year|HourMinute|
+-------+---------+--------------------+--------+-------------------+---------+----------+--------------+---+-----+----+----------+
| 536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|  1|   12|2010|     08:26|
| 536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|  1|   12|2010|     08:26|
| 536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|  1|   12|2010|     08:26|
| 536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|  1|   12|2010|     08:26|
| 536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.

In [18]:
missing_count_expr = [F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns]
missing_values_df = df.agg(*missing_count_expr)
missing_values_df.show()

+-------+---------+-----------+--------+---------+---------+----------+-------+---+-----+----+----------+
|OrderID|ProductID|Description|Quantity|OrderDate|UnitPrice|CustomerID|Country|Day|Month|Year|HourMinute|
+-------+---------+-----------+--------+---------+---------+----------+-------+---+-----+----+----------+
|      0|        0|       1454|       0|        0|        0|    135080|      0|  0|    0|   0|         0|
+-------+---------+-----------+--------+---------+---------+----------+-------+---+-----+----+----------+



                                                                                

In [20]:
# fill miss value for CustomerID

# Tìm giá trị bắt đầu cho CustomerID mới
max_customer_id = df.agg(F.max("CustomerID")).collect()[0][0]

# Lấy danh sách các OrderID có giá trị CustomerID bị thiếu (null)
order_ids_with_null_customer = df.filter(F.col("CustomerID").isNull()).select("OrderID").distinct()

# Tạo cột số thứ tự cho các OrderID có giá trị CustomerID null
window_spec = Window.orderBy("OrderID")
order_ids_with_new_customer = order_ids_with_null_customer.withColumn("new_CustomerID", F.row_number().over(window_spec) + max_customer_id)

# Thay thế các giá trị null của cột CustomerID bằng các giá trị mới dựa trên OrderID
df = df.join(order_ids_with_new_customer, "OrderID", "left").withColumn(
    "CustomerID", 
    F.coalesce(F.col("CustomerID"), F.col("new_CustomerID"))
).drop("new_CustomerID")

In [21]:
df.show(10)

24/09/15 00:42:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 00:42:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 00:42:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 00:42:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 00:42:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 00:42:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 0

+-------+---------+--------------------+--------+-------------------+---------+----------+--------------+---+-----+----+----------+
|OrderID|ProductID|         Description|Quantity|          OrderDate|UnitPrice|CustomerID|       Country|Day|Month|Year|HourMinute|
+-------+---------+--------------------+--------+-------------------+---------+----------+--------------+---+-----+----+----------+
| 536366|    22633|HAND WARMER UNION...|       6|2010-12-01 08:28:00|     1.85|     17850|United Kingdom|  1|   12|2010|     08:28|
| 536366|    22632|HAND WARMER RED P...|       6|2010-12-01 08:28:00|     1.85|     17850|United Kingdom|  1|   12|2010|     08:28|
| 536367|    84879|ASSORTED COLOUR B...|      32|2010-12-01 08:34:00|     1.69|     13047|United Kingdom|  1|   12|2010|     08:34|
| 536367|    22745|POPPY'S PLAYHOUSE...|       6|2010-12-01 08:34:00|      2.1|     13047|United Kingdom|  1|   12|2010|     08:34|
| 536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.

In [22]:
missing_count_expr = [F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns]
missing_values_df = df.agg(*missing_count_expr)
missing_values_df.show()

24/09/15 00:42:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 00:42:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 00:42:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 00:42:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 00:42:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 00:42:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 0

+-------+---------+-----------+--------+---------+---------+----------+-------+---+-----+----+----------+
|OrderID|ProductID|Description|Quantity|OrderDate|UnitPrice|CustomerID|Country|Day|Month|Year|HourMinute|
+-------+---------+-----------+--------+---------+---------+----------+-------+---+-----+----+----------+
|      0|        0|       1454|       0|        0|        0|         0|      0|  0|    0|   0|         0|
+-------+---------+-----------+--------+---------+---------+----------+-------+---+-----+----+----------+



In [23]:
df.printSchema()

root
 |-- OrderID: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- OrderDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- HourMinute: string (nullable = true)



In [24]:
# Sử dụng na.fill() để thay thế các giá trị null của cột Description
df = df.na.fill({"Description": "unknown"})


In [25]:
missing_count_expr = [F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns]
missing_values_df = df.agg(*missing_count_expr)
missing_values_df.show()

24/09/15 00:48:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 00:48:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 00:48:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 00:48:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 00:48:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 00:48:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 0

+-------+---------+-----------+--------+---------+---------+----------+-------+---+-----+----+----------+
|OrderID|ProductID|Description|Quantity|OrderDate|UnitPrice|CustomerID|Country|Day|Month|Year|HourMinute|
+-------+---------+-----------+--------+---------+---------+----------+-------+---+-----+----+----------+
|      0|        0|          0|       0|        0|        0|         0|      0|  0|    0|   0|         0|
+-------+---------+-----------+--------+---------+---------+----------+-------+---+-----+----+----------+



In [26]:
df = df.dropna()

In [27]:
df = df.dropDuplicates()

In [28]:
df.show(30)

24/09/15 00:51:10 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 00:51:10 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 00:51:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 00:51:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 00:51:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 00:51:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 0

+-------+---------+--------------------+--------+-------------------+---------+----------+--------------+---+-----+----+----------+
|OrderID|ProductID|         Description|Quantity|          OrderDate|UnitPrice|CustomerID|       Country|Day|Month|Year|HourMinute|
+-------+---------+--------------------+--------+-------------------+---------+----------+--------------+---+-----+----+----------+
| 536596|    21624|VINTAGE UNION JAC...|       1|2010-12-01 17:29:00|     5.95|     18303|United Kingdom|  1|   12|2010|     17:29|
| 536596|    22900| SET 2 TEA TOWELS...|       1|2010-12-01 17:29:00|     2.95|     18303|United Kingdom|  1|   12|2010|     17:29|
| 536596|    22114|HOT WATER BOTTLE ...|       1|2010-12-01 17:29:00|     3.95|     18303|United Kingdom|  1|   12|2010|     17:29|
| 536596|    21967|PACK OF 12 SKULL ...|       1|2010-12-01 17:29:00|     0.29|     18303|United Kingdom|  1|   12|2010|     17:29|
| 536596|   84926A|WAKE UP COCKEREL ...|       4|2010-12-01 17:29:00|     1.

In [29]:
df.printSchema()

root
 |-- OrderID: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- Description: string (nullable = false)
 |-- Quantity: integer (nullable = true)
 |-- OrderDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- HourMinute: string (nullable = true)



In [6]:
def transform_calc_data(data_source: str, output_uri: str) -> None:
    """
    Processes sample food establishment inspection data and queries the data to find 
    the top 10 establishments with the most Red violations from 2006 to 2020.

    :param data_source: The URI of your food establishment data CSV, 
    such as 's3://DOC-EXAMPLE-BUCKET/food-establishment-data.csv'.
    :param output_uri: The URI where output is written, such as 
    's3://DOC-EXAMPLE-BUCKET/restaurant_violation_results'.
    """
    
    with SparkSession.builder.appName("emr-cluster-{}".format(
        datetime.today())).getOrCreate() as spark:
                                 
        # Load the Online Retail CSV data
        if data_source is not None:
            df:DataFrame = spark.read.csv(data_source, header=True, inferSchema=True)
        # Log into EMR stdout
        print(f"Dataset have shape: {(df.count(), df.columns)}")
        
        # Rename Columns
        col_renames = {
            'InvoiceNo': 'OrderID', 
            'StockCode': 'ProductID', 
            'InvoiceDate': 'OrderDate'
        }
        for old_name, new_name in col_renames.items():
            df = df.withColumnRenamed(old_name, new_name)
        
        # Remove spaces
        df = df.withColumn("OrderDate", F.trim(F.col("OrderDate"))) 
            
        # Change data type for column OrderDate
        DATE_FORMAT = ["M/d/yyyy H:mm", "M/d/yyyy H:mm", "M/d/yyyy H:mm"]
        df = df.withColumn(
            "OrderDate",
            F.coalesce(
                F.to_timestamp(F.col("OrderDate"), DATE_FORMAT[0]),
                F.to_timestamp(F.col("OrderDate"), DATE_FORMAT[1]),
                F.to_timestamp(F.col("OrderDate"), DATE_FORMAT[2])
            )
        )
        # Feature Engineering
        TIME_FORMAT = "HH:mm"
        df = df.withColumn("Day", F.dayofmonth("OrderDate"))
        df = df.withColumn("Month", F.month("OrderDate"))
        df = df.withColumn("Year", F.year("OrderDate"))
        df = df.withColumn("HourMinute", F.date_format("OrderDate", TIME_FORMAT))
            
        # Tìm giá trị bắt đầu cho CustomerID mới
        max_customer_id = df.agg(F.max("CustomerID")).collect()[0][0]
        
        # Lấy danh sách các OrderID có giá trị CustomerID bị thiếu (null)
        order_ids_with_null_customer = df.filter(F.col("CustomerID").isNull()).select("OrderID").distinct()
        
        # Tạo cột số thứ tự cho các OrderID có giá trị CustomerID null
        window_spec = Window.orderBy("OrderID")
        order_ids_with_new_customer = order_ids_with_null_customer.withColumn(
            "new_CustomerID", F.row_number().over(window_spec) + max_customer_id)
        
        # Thay thế các giá trị null của cột CustomerID bằng các giá trị mới dựa trên OrderID
        df = df.join(order_ids_with_new_customer, "OrderID", "left").withColumn(
            "CustomerID", F.coalesce(F.col("CustomerID"), F.col("new_CustomerID"))
        ).drop("new_CustomerID")
        
        # Sử dụng na.fill() để thay thế các giá trị null của cột Description
        df = df.withColumn("Description", F.lower(F.col("Description")))
        df = df.na.fill({"Description": "unknown"})
        
        df = df.dropna()
        df = df.dropDuplicates()
        
        # Write our results as parquet files
        # df.write.option("header", "true").mode("overwrite").parquet(output_uri)
        df.show(10)

In [7]:
transform_calc_data(URL, "sdjf")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/15 01:32:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

Dataset have shape: (541909, ['InvoiceNo', 'StockCode', 'Description', 'Quantity', 'InvoiceDate', 'UnitPrice', 'CustomerID', 'Country'])


24/09/15 01:32:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 01:32:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 01:32:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 01:32:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 01:32:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 01:32:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 0

+-------+---------+--------------------+--------+-------------------+---------+----------+--------------+---+-----+----+----------+
|OrderID|ProductID|         Description|Quantity|          OrderDate|UnitPrice|CustomerID|       Country|Day|Month|Year|HourMinute|
+-------+---------+--------------------+--------+-------------------+---------+----------+--------------+---+-----+----+----------+
| 536596|    21624|vintage union jac...|       1|2010-12-01 17:29:00|     5.95|     18303|United Kingdom|  1|   12|2010|     17:29|
| 536596|    22900| set 2 tea towels...|       1|2010-12-01 17:29:00|     2.95|     18303|United Kingdom|  1|   12|2010|     17:29|
| 536596|    22114|hot water bottle ...|       1|2010-12-01 17:29:00|     3.95|     18303|United Kingdom|  1|   12|2010|     17:29|
| 536596|    21967|pack of 12 skull ...|       1|2010-12-01 17:29:00|     0.29|     18303|United Kingdom|  1|   12|2010|     17:29|
| 536596|   84926A|wake up cockerel ...|       4|2010-12-01 17:29:00|     1.