### Install PySpark

In [1]:
!pip install pyspark



### Import Libraries

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
import os
from pymongo import MongoClient

### Set Java Home

In [3]:
os.environ["JAVA_HOME"] = "C:\Program Files\Java\jdk-17"

### Initiate Spark Context

In [4]:
conf = SparkConf() \
    .setAppName("Example") \
    .setMaster("local") \
    .set("spark.driver.extraClassPath", "C:/Users/SD-LORENZO-PC/anaconda3/envs/geoDev/Scripts/pyspark/*")

In [5]:
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

In [6]:
spark

### Read CSV File

In [7]:
file_csv = r"C:\Users\SD-LORENZO-PC\pyproject\rndPy\Geospatial\apacheSpark\AdvWorksData.csv"

df = spark.read.options(delimeter=",", header=True).csv(file_csv)
df.show()

+---------------+------------------+--------------------+--------------+--------------+------+-------------+----------------+-----------+----------+---------+------------+---------+-----------------+--------+---------+-----------------+----------+-------------+---------+
|productcategory|productsubcategory|             product| saleterritory|       Country|  City|         Sate|        Customer|   Employee|OrderCount|OrderDate|StandardCost|UnitPrice|UnitPriceDiscount|Discount|ListPrice|SaleswithStandard|  NetSales|OrderQuantity|    Sales|
+---------------+------------------+--------------------+--------------+--------------+------+-------------+----------------+-----------+----------+---------+------------+---------+-----------------+--------+---------+-----------------+----------+-------------+---------+
|       Clothing|              Caps|        AWC Logo Cap|United Kingdom|United Kingdom| Berks|      England|      Gary Suess|Amy Alberts|         1|  00:00.0|      6.9223|   5.0136|   

In [8]:
df.printSchema()

root
 |-- productcategory: string (nullable = true)
 |-- productsubcategory: string (nullable = true)
 |-- product: string (nullable = true)
 |-- saleterritory: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Sate: string (nullable = true)
 |-- Customer: string (nullable = true)
 |-- Employee: string (nullable = true)
 |-- OrderCount: string (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- StandardCost: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- UnitPriceDiscount: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- ListPrice: string (nullable = true)
 |-- SaleswithStandard: string (nullable = true)
 |-- NetSales: string (nullable = true)
 |-- OrderQuantity: string (nullable = true)
 |-- Sales: string (nullable = true)



### Common Dataframe Actions

In [9]:
df1 = df[['productcategory', 'saleterritory', 'OrderDate', 'Sales']]
df1.show()

+---------------+--------------+---------+---------+
|productcategory| saleterritory|OrderDate|    Sales|
+---------------+--------------+---------+---------+
|       Clothing|United Kingdom|  00:00.0|68.786592|
|    Accessories|United Kingdom|  00:00.0|       90|
|       Clothing|United Kingdom|  00:00.0|  182.352|
|       Clothing|United Kingdom|  00:00.0| 317.5964|
|       Clothing|United Kingdom|  00:00.0|  159.558|
|       Clothing|United Kingdom|  00:00.0|   45.588|
|       Clothing|United Kingdom|  00:00.0|   22.794|
|       Clothing|United Kingdom|  00:00.0|   22.794|
|       Clothing|United Kingdom|  00:00.0|  42.3867|
|       Clothing|United Kingdom|  00:00.0| 113.0312|
|       Clothing|United Kingdom|  00:00.0|  42.3867|
|     Components|United Kingdom|  00:00.0|  826.164|
|     Components|United Kingdom|  00:00.0|  149.676|
|     Components|United Kingdom|  00:00.0| 1472.291|
|     Components|United Kingdom|  00:00.0| 736.1455|
|     Components|United Kingdom|  00:00.0| 744

In [21]:
saleterritory = df.groupBy('saleterritory').count()
print(saleterritory.show())

+--------------+-----+
| saleterritory|count|
+--------------+-----+
|       Germany| 1864|
|        France| 3530|
|     Northwest| 7872|
|     Southeast| 5937|
|       Central| 5812|
|        Canada|11444|
|     Southwest|13379|
|     Australia| 1713|
|United Kingdom| 3520|
|     Northeast| 5809|
+--------------+-----+

None


### Use Spark SQL to query data

In [11]:
df.createOrReplaceTempView("sales")
output = spark.sql('SELECT * from sales')
output.show()

+---------------+------------------+--------------------+--------------+--------------+------+-------------+----------------+-----------+----------+---------+------------+---------+-----------------+--------+---------+-----------------+----------+-------------+---------+
|productcategory|productsubcategory|             product| saleterritory|       Country|  City|         Sate|        Customer|   Employee|OrderCount|OrderDate|StandardCost|UnitPrice|UnitPriceDiscount|Discount|ListPrice|SaleswithStandard|  NetSales|OrderQuantity|    Sales|
+---------------+------------------+--------------------+--------------+--------------+------+-------------+----------------+-----------+----------+---------+------------+---------+-----------------+--------+---------+-----------------+----------+-------------+---------+
|       Clothing|              Caps|        AWC Logo Cap|United Kingdom|United Kingdom| Berks|      England|      Gary Suess|Amy Alberts|         1|  00:00.0|      6.9223|   5.0136|   

In [12]:
output = spark.sql('SELECT * from sales WHERE `UnitPrice` > 20 AND OrderQuantity >=18')
output.show()

+---------------+------------------+--------------------+-------------+-------------+------------+----------------+----------------+--------------+----------+---------+------------+---------+-----------------+--------+---------+-----------------+------------+-------------+-----------+
|productcategory|productsubcategory|             product|saleterritory|      Country|        City|            Sate|        Customer|      Employee|OrderCount|OrderDate|StandardCost|UnitPrice|UnitPriceDiscount|Discount|ListPrice|SaleswithStandard|    NetSales|OrderQuantity|      Sales|
+---------------+------------------+--------------------+-------------+-------------+------------+----------------+----------------+--------------+----------+---------+------------+---------+-----------------+--------+---------+-----------------+------------+-------------+-----------+
|       Clothing|             Vests|     Classic Vest, M|      Germany|      Germany|Grevenbroich|          Bayern|  Stephen Burton|   Amy Alb

In [13]:
output = spark.sql('SELECT COUNT(*) as total, productcategory from sales GROUP BY productcategory')
output.show()

+-----+---------------+
|total|productcategory|
+-----+---------------+
|24800|          Bikes|
|12260|       Clothing|
| 5098|    Accessories|
|18683|     Components|
|   39|           NULL|
+-----+---------------+



In [14]:
output = spark.sql('SELECT * from sales WHERE `UnitPrice` > 20 AND OrderQuantity >= 10')
output.show()

+---------------+------------------+--------------------+--------------+--------------+-------------+----------+--------------------+--------------+----------+---------+------------+---------+-----------------+--------+---------+-----------------+------------+-------------+-----------+
|productcategory|productsubcategory|             product| saleterritory|       Country|         City|      Sate|            Customer|      Employee|OrderCount|OrderDate|StandardCost|UnitPrice|UnitPriceDiscount|Discount|ListPrice|SaleswithStandard|    NetSales|OrderQuantity|      Sales|
+---------------+------------------+--------------------+--------------+--------------+-------------+----------+--------------------+--------------+----------+---------+------------+---------+-----------------+--------+---------+-----------------+------------+-------------+-----------+
|       Clothing|           Jerseys|Long-Sleeve Logo ...|United Kingdom|United Kingdom|        Berks|   England|          Gary Suess|   Amy

### Persist

### Show data 

In [15]:
mongo_uri = "mongodb+srv://devadmin:2553669874123654@dev218.kke0v.mongodb.net/?retryWrites=true&w=majority&appName=dev218"
database_name = "csv_data"
collection_name = "Adv_Works"

In [16]:
from pymongo import MongoClient

client = MongoClient(mongo_uri)
db = client[database_name]
collection = db[collection_name]

data_to_insert = df.toPandas().to_dict(orient="records")

if data_to_insert:
    collection.insert_many(data_to_insert)
    print("Sending data to MongoDB is succesfull!")
else: 
    print("No data to insert into MongoDB")

Sending data to MongoDB is succesfull!


In [17]:
naem = input("name here :")

name here :  test
