In [1]:

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))


In [120]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark.sql.functions import window, col

In [3]:
path='/content/drive/MyDrive/Online_Retail.csv'

In [4]:
pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [5]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [6]:
spark=(SparkSession.builder.appName('First_pro').getOrCreate())
spark

In [7]:
df=spark.read.format("csv").option('header','true')\
.option('inferSchema','true')\
.option("recursiveFileLookup", "true")\
.load(path)

In [8]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [9]:
df.show(10,truncate=False)

+---------+---------+-----------------------------------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate     |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+----------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |01/12/2010 08:26|2.55     |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |01/12/2010 08:26|3.39     |17850     |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |01/12/2010 08:26|2.75     |17850     |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |01/12/2010 08:26|3.39     |17850     |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |01/12/2010 08:26|3.39     |17850     |United Kingdom|
|536365   |22752    |SET 7 BABUSHKA NEST

In [10]:
from pyspark.sql.functions import *

In [18]:
dfsq=df.createOrReplaceTempView('Retial')

In [22]:
sql_query1='SELECT * FROM Retial'
q1=spark.sql(sql_query1)
q1.show()

+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|01/12/2010 08:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01/12/2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01/12/2010 08:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01/12/2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01/12/2010 08:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|01/12/2010 08:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|01/1

In [27]:
## another with out sql
q2=df.select('*')
q2.show(5)

+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|01/12/2010 08:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01/12/2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01/12/2010 08:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01/12/2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01/12/2010 08:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
only showing top 5 rows



In [47]:
# Define the SQL query for transformation
sql_query = """
SELECT Country, SUM(Quantity) as TotalQuantity
FROM Retial
WHERE Country IS NOT NULL
GROUP BY Country
ORDER BY TotalQuantity
"""

# Execute the query using Spark SQL
aggDF = spark.sql(sql_query)

# Show the top 10 rows of the aggregated DataFrame
aggDF.show(10)

+--------------------+-------------+
|             Country|TotalQuantity|
+--------------------+-------------+
|        Saudi Arabia|           75|
|             Bahrain|          260|
|                 RSA|          352|
|              Brazil|          356|
|             Lebanon|          386|
|  European Community|          497|
|      Czech Republic|          592|
|           Lithuania|          652|
|               Malta|          944|
|United Arab Emirates|          982|
+--------------------+-------------+
only showing top 10 rows



In [76]:
q2=df.select('Country','Quantity')\
.groupby('Country')\
.agg(sum('Quantity').alias('total_Q'))\
.sort('total_Q')\
.where('total_Q < 800')

In [77]:
q2.show(10)

+------------------+-------+
|           Country|total_Q|
+------------------+-------+
|      Saudi Arabia|     75|
|           Bahrain|    260|
|               RSA|    352|
|            Brazil|    356|
|           Lebanon|    386|
|European Community|    497|
|    Czech Republic|    592|
|         Lithuania|    652|
+------------------+-------+



In [90]:
strschema=df.schema

In [95]:
strdf=spark.readStream.schema(strschema).format('csv')\
.option('maxfilepertrigger',1)\
.option('header','true')\
.load('/content/drive/MyDrive/Retail_Days/by-day/*')\



In [96]:
# Register the streaming DataFrame as a temporary view for SQL operations
strdf.createOrReplaceTempView("streaming_data")

In [103]:
# Define the SQL query for transformation
sql_query = """
SELECT
    CustomerID as CustomerId,
    (UnitPrice * Quantity) as total_cost,
    InvoiceDate
FROM
    streaming_data
    """

qs1=spark.sql(sql_query)
qs1.createGlobalTempView('Transformeddata')

In [137]:
# Correct way to run the streaming query
query2 = qs1.writeStream.format("memory").queryName("transformed_data").outputMode("append").start()

# You must keep the stream alive
# Or process.awaitAnyTermination() in notebooks

In [139]:
# Check the current status of the streaming query
status = query2.status
status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [142]:

# Check if the streaming query is active
is_active = query2.isActive
is_active

True

In [143]:

# get the last progress report
last_progress = query.lastProgress
print(f"Last progress: {last_progress}")

Last progress: {'id': '95590424-26af-4b0f-959b-d6e00f2d2b65', 'runId': '0a64e10c-8ee3-457e-9759-4b1f7d31e2f4', 'name': 'transformed_data', 'timestamp': '2025-04-03T22:40:30.496Z', 'batchId': 1, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0, 'durationMs': {'latestOffset': 2701, 'triggerExecution': 2701}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/content/drive/MyDrive/Retail_Days/by-day/*]', 'startOffset': {'logOffset': 0}, 'endOffset': {'logOffset': 0}, 'latestOffset': None, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0}], 'sink': {'description': 'MemorySink', 'numOutputRows': 0}}


In [147]:
# Query the result to display the top 10 rows
result = spark.sql("""
SELECT *
FROM transformed_data
ORDER BY total_cost DESC
""")
result.show(10)

+----------+-----------------+-------------------+
|CustomerId|       total_cost|        InvoiceDate|
+----------+-----------------+-------------------+
|      NULL|          77183.6|2011-01-18 10:01:00|
|      NULL|          38970.0|2011-06-10 15:28:00|
|      NULL|         13541.33|2010-12-07 15:08:00|
|      NULL|         11062.06|2011-08-12 14:50:00|
|      NULL|          8142.75|2011-05-03 13:46:00|
|      NULL|7144.719999999999|2011-09-20 11:05:00|
|      NULL|6539.400000000001|2011-01-11 12:55:00|
|      NULL|6539.400000000001|2011-04-18 13:20:00|
|      NULL|           4921.5|2011-01-11 12:55:00|
|      NULL|           4632.0|2011-04-18 13:20:00|
+----------+-----------------+-------------------+
only showing top 10 rows

