In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataAnalysis using Pyspark")\
.config("spark.memory.offHeap.enabled","true").config("spark.memory.offHeap.size","20g").getOrCreate()



In [2]:
df= spark.read.csv('OnlineRetail.csv',header = True,escape = "\"")



In [3]:
df.show(5,0)

+---------+---------+-----------------------------------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate     |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+----------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |01-12-2010 08:26|2.55     |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |01-12-2010 08:26|3.39     |17850     |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |01-12-2010 08:26|2.75     |17850     |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |01-12-2010 08:26|3.39     |17850     |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |01-12-2010 08:26|3.39     |17850     |United Kingdom|
+---------+---------+-------------------

In [4]:
df.count()


541909

In [5]:
#how many unique customers are present in the given dataset
df.select('CustomerId').distinct().count()

4373

In [6]:
df.head()

Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity='6', InvoiceDate='01-12-2010 08:26', UnitPrice='2.55', CustomerID='17850', Country='United Kingdom')

In [7]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

df.groupBy('Country').agg(countDistinct('CustomerID').alias('country_count')).show()

+------------------+-------------+
|           Country|country_count|
+------------------+-------------+
|            Sweden|            8|
|         Singapore|            1|
|           Germany|           95|
|               RSA|            1|
|            France|           87|
|            Greece|            4|
|European Community|            1|
|           Belgium|           25|
|           Finland|           12|
|             Malta|            2|
|       Unspecified|            4|
|             Italy|           15|
|              EIRE|            3|
|         Lithuania|            1|
|            Norway|           10|
|             Spain|           31|
|           Denmark|            9|
|         Hong Kong|            0|
|            Israel|            4|
|           Iceland|            1|
+------------------+-------------+
only showing top 20 rows



In [8]:
df.groupBy('Country').agg(countDistinct('CustomerID').alias('country_count')).orderBy(desc('country_count')).show()

+---------------+-------------+
|        Country|country_count|
+---------------+-------------+
| United Kingdom|         3950|
|        Germany|           95|
|         France|           87|
|          Spain|           31|
|        Belgium|           25|
|    Switzerland|           21|
|       Portugal|           19|
|          Italy|           15|
|        Finland|           12|
|        Austria|           11|
|         Norway|           10|
|        Denmark|            9|
|Channel Islands|            9|
|      Australia|            9|
|    Netherlands|            9|
|         Sweden|            8|
|         Cyprus|            8|
|          Japan|            8|
|         Poland|            6|
|         Greece|            4|
+---------------+-------------+
only showing top 20 rows



In [9]:
#when was the most recent purchase made by a customer on the e-commmerce platform
df.select(max("InvoiceDate")).show()

+----------------+
|max(InvoiceDate)|
+----------------+
|31-10-2011 17:19|
+----------------+



In [10]:
df.show()


+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|01-12-2010 08:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01-12-2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01-12-2010 08:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01-12-2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01-12-2010 08:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|01-12-2010 08:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|01-1

In [1]:
df = df.withColumn("from_date",lit("1/10/2011 10:04"))

NameError: name 'df' is not defined

In [12]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- from_date: string (nullable = false)



In [13]:
df = df.drop("date")

In [14]:
df = df.join(df.groupBy('CustomerID').agg(max('Quantity').alias('Quantity')),on = 'Quantity',how='leftsemi')

In [15]:
df_freq= df.groupBy("CustomerID").agg(count('InvoiceDate').alias('frequency'))
df_freq.show()

+----------+---------+
|CustomerID|frequency|
+----------+---------+
|     16250|       24|
|     15574|      168|
|     15555|      918|
|     15271|      274|
|     17714|       10|
|     17686|      286|
|     13865|       30|
|     14157|       48|
|     13610|      228|
|     17757|      735|
|     17551|       43|
|     13187|       37|
|     16549|      979|
|     12637|      394|
|     15052|       30|
|     15448|       28|
|     13985|      348|
|     12888|        7|
|     14525|      296|
|     18283|      755|
+----------+---------+
only showing top 20 rows



In [16]:
df3= df.join(df_freq,on = 'CustomerID',how = 'inner')
df3.show()

+----------+--------+---------+---------+--------------------+----------------+---------+--------------+---------------+---------+
|CustomerID|Quantity|InvoiceNo|StockCode|         Description|     InvoiceDate|UnitPrice|       Country|      from_date|frequency|
+----------+--------+---------+---------+--------------------+----------------+---------+--------------+---------------+---------+
|     17850|       6|   536365|   85123A|WHITE HANGING HEA...|01-12-2010 08:26|     2.55|United Kingdom|1/10/2011 10:04|      312|
|     17850|       6|   536365|    71053| WHITE METAL LANTERN|01-12-2010 08:26|     3.39|United Kingdom|1/10/2011 10:04|      312|
|     17850|       8|   536365|   84406B|CREAM CUPID HEART...|01-12-2010 08:26|     2.75|United Kingdom|1/10/2011 10:04|      312|
|     17850|       6|   536365|   84029G|KNITTED UNION FLA...|01-12-2010 08:26|     3.39|United Kingdom|1/10/2011 10:04|      312|
|     17850|       6|   536365|   84029E|RED WOOLLY HOTTIE...|01-12-2010 08:26|    

In [17]:
df3.printSchema()

root
 |-- CustomerID: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- from_date: string (nullable = false)
 |-- frequency: long (nullable = false)



In [18]:
m_val = df3.withColumn('Total Amount',col("Quantity")*col("UnitPrice"))
m_val.show()

+----------+--------+---------+---------+--------------------+----------------+---------+--------------+---------------+---------+------------------+
|CustomerID|Quantity|InvoiceNo|StockCode|         Description|     InvoiceDate|UnitPrice|       Country|      from_date|frequency|      Total Amount|
+----------+--------+---------+---------+--------------------+----------------+---------+--------------+---------------+---------+------------------+
|     17850|       6|   536365|   85123A|WHITE HANGING HEA...|01-12-2010 08:26|     2.55|United Kingdom|1/10/2011 10:04|      312|15.299999999999999|
|     17850|       6|   536365|    71053| WHITE METAL LANTERN|01-12-2010 08:26|     3.39|United Kingdom|1/10/2011 10:04|      312|             20.34|
|     17850|       8|   536365|   84406B|CREAM CUPID HEART...|01-12-2010 08:26|     2.75|United Kingdom|1/10/2011 10:04|      312|              22.0|
|     17850|       6|   536365|   84029G|KNITTED UNION FLA...|01-12-2010 08:26|     3.39|United King

In [19]:
m_val = m_val.groupBy('CustomerID').agg(sum('Total Amount').alias('Monetory_value'))

In [20]:
m_val.show()

+----------+------------------+
|CustomerID|    Monetory_value|
+----------+------------------+
|     16250|389.44000000000005|
|     15574| 702.2500000000002|
|     15555| 4710.320000000003|
|     15271|           2442.47|
|     17714|             153.0|
|     17686|           5739.46|
|     13865|501.56000000000006|
|     14157| 432.8800000000001|
|     13610|1115.4299999999996|
|     17757| 5293.540000000004|
|     17551|            306.84|
|     13187|236.01999999999995|
|     16549| 4125.270000000001|
|     12637| 5953.250000000001|
|     15052|            215.78|
|     15448|494.64000000000004|
|     13985| 6867.029999999999|
|     12888|            313.77|
|     14525|3961.2700000000004|
|     18283|2078.6299999999997|
+----------+------------------+
only showing top 20 rows



In [21]:
finaldf = m_val.join(df3,on= "CustomerID",how = "inner")
finaldf.show()

+----------+------------------+--------+---------+---------+--------------------+----------------+---------+--------------+---------------+---------+
|CustomerID|    Monetory_value|Quantity|InvoiceNo|StockCode|         Description|     InvoiceDate|UnitPrice|       Country|      from_date|frequency|
+----------+------------------+--------+---------+---------+--------------------+----------------+---------+--------------+---------------+---------+
|     16250|389.44000000000005|       3|   536388|    21754|HOME BUILDING BLO...|01-12-2010 09:59|     5.95|United Kingdom|1/10/2011 10:04|       24|
|     16250|389.44000000000005|       3|   536388|    21755|LOVE BUILDING BLO...|01-12-2010 09:59|     5.95|United Kingdom|1/10/2011 10:04|       24|
|     16250|389.44000000000005|       2|   536388|    21523|DOORMAT FANCY FON...|01-12-2010 09:59|     7.95|United Kingdom|1/10/2011 10:04|       24|
|     16250|389.44000000000005|       3|   536388|    21363|HOME SMALL WOOD L...|01-12-2010 09:59|  

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder \
    .appName("ExampleApp") \
    .getOrCreate()

data = [("Alice", 1), ("Bob", 2)]

columns = ["name", "id"]

df = spark.createDataFrame(data=data, schema=columns)
df.show()


#The createDataFrame method takes two arguments: data and schema.

#Explicit Schema
schema = StructType([
    StructField("name", StringType(), True),
    StructField("id", IntegerType(), True)
])

df1 = spark.createDataFrame(data, schema)
df1.show()

+-----+---+
| name| id|
+-----+---+
|Alice|  1|
|  Bob|  2|
+-----+---+

+-----+---+
| name| id|
+-----+---+
|Alice|  1|
|  Bob|  2|
+-----+---+



In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when

# Create a SparkSession
spark = SparkSession.builder \
    .appName("ExampleApp") \
    .getOrCreate()

# Example DataFrame
df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])

# Using col and lit functions
df = df.withColumn("new_col", col("id") + lit(10))

# Using when function
df = df.withColumn("status", when(col("id") == 1, "active").otherwise("inactive"))

# Show the resulting DataFrame
df.show()

+---+-----+-------+--------+
| id| name|new_col|  status|
+---+-----+-------+--------+
|  1|Alice|     11|  active|
|  2|  Bob|     12|inactive|
+---+-----+-------+--------+



In [8]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("ExampleApp") \
    .getOrCreate()

# Defining a schema [ define an explicit schema]
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
])

# Creating a DataFrame with the defined schema
df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], schema)
df.show()

+---+-----+
| id| name|
+---+-----+
|  1|Alice|
|  2|  Bob|
+---+-----+



In [5]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

finaldf = finaldf.withColumn('Quantity',col('Quantity').cast('double'))

assemble = VectorAssembler(inputCols = ['Quantity','frequency','Monetory_value'],outputCol = 'Features')
assembled_data = assemble.transform(finaldf)

scale = StandardScaler(inputCol = 'Features',outputCol = 'Standardized')
data_scale = scale.fit(assembled_data)
data_scale_output = data_scale.transform(assembled_data)
data_scale_output.show()


NameError: name 'finaldf' is not defined