In [2]:
import os
from dotenv import load_dotenv
from pymongo.server_api import ServerApi
from pymongo.mongo_client import MongoClient

In [3]:
load_dotenv()


uri = os.getenv("MONGO_ATLAS_URL")
client = MongoClient(uri, server_api=ServerApi("1"))

try:
    client.admin.command("ping")
    print("Conectado no MongoDB!")
except Exception as e:
    print(e)

Conectado no MongoDB!


In [4]:
db = client['db_sales']

In [5]:
db.list_collection_names()

['sales']

In [6]:
collection = db['sales']

In [7]:
list(collection.find({}).limit(5))

[{'_id': ObjectId('67b387e92a80135900d5002e'),
  'ORDERNUMBER': '10107',
  'QUANTITYORDERED': '30',
  'PRICEEACH': '95.7',
  'ORDERLINENUMBER': '2',
  'SALES': '2871',
  'ORDERDATE': '2/24/2003 0:00',
  'STATUS': 'Shipped',
  'QTR_ID': '1',
  'MONTH_ID': '2',
  'YEAR_ID': '2003',
  'PRODUCTLINE': 'Motorcycles',
  'MSRP': '95',
  'PRODUCTCODE': 'S10_1678',
  'CUSTOMERNAME': 'Land of Toys Inc.',
  'PHONE': '2125557818',
  'ADDRESSLINE1': '897 Long Airport Avenue',
  'ADDRESSLINE2': '',
  'CITY': 'NYC',
  'STATE': 'NY',
  'POSTALCODE': '10022',
  'COUNTRY': 'USA',
  'TERRITORY': 'NA',
  'CONTACTLASTNAME': 'Yu',
  'CONTACTFIRSTNAME': 'Kwai',
  'DEALSIZE': 'Small'},
 {'_id': ObjectId('67b387e92a80135900d5002f'),
  'ORDERNUMBER': '10121',
  'QUANTITYORDERED': '34',
  'PRICEEACH': '81.35',
  'ORDERLINENUMBER': '5',
  'SALES': '2765.9',
  'ORDERDATE': '5/7/2003 0:00',
  'STATUS': 'Shipped',
  'QTR_ID': '2',
  'MONTH_ID': '5',
  'YEAR_ID': '2003',
  'PRODUCTLINE': 'Motorcycles',
  'MSRP': '95',

In [11]:
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .appName("Sales Data Spark") \
    .config("spark.mongodb.read.connection.uri", uri) \
    .config("spark.mongodb.write.connection.uri", uri) \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector:10.0.3") \
    .getOrCreate()

In [12]:
df = (spark.read.format("mongodb").option("database", "db_sales").option("collection", "sales").load())

In [13]:
df.printSchema()

root
 |-- ADDRESSLINE1: string (nullable = true)
 |-- ADDRESSLINE2: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- CONTACTFIRSTNAME: string (nullable = true)
 |-- CONTACTLASTNAME: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- CUSTOMERNAME: string (nullable = true)
 |-- DEALSIZE: string (nullable = true)
 |-- MONTH_ID: string (nullable = true)
 |-- MSRP: string (nullable = true)
 |-- ORDERDATE: string (nullable = true)
 |-- ORDERLINENUMBER: string (nullable = true)
 |-- ORDERNUMBER: string (nullable = true)
 |-- PHONE: string (nullable = true)
 |-- POSTALCODE: string (nullable = true)
 |-- PRICEEACH: string (nullable = true)
 |-- PRODUCTCODE: string (nullable = true)
 |-- PRODUCTLINE: string (nullable = true)
 |-- QTR_ID: string (nullable = true)
 |-- QUANTITYORDERED: string (nullable = true)
 |-- SALES: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- TERRITORY: string (nullable = true)
 |

In [14]:
df.show(5)

25/02/17 19:08:42 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 0:>                                                          (0 + 1) / 1]

+--------------------+------------+-------------+----------------+---------------+-------+--------------------+--------+--------+----+---------------+---------------+-----------+----------------+----------+---------+-----------+-----------+------+---------------+-------+-----+-------+---------+-------+--------------------+
|        ADDRESSLINE1|ADDRESSLINE2|         CITY|CONTACTFIRSTNAME|CONTACTLASTNAME|COUNTRY|        CUSTOMERNAME|DEALSIZE|MONTH_ID|MSRP|      ORDERDATE|ORDERLINENUMBER|ORDERNUMBER|           PHONE|POSTALCODE|PRICEEACH|PRODUCTCODE|PRODUCTLINE|QTR_ID|QUANTITYORDERED|  SALES|STATE| STATUS|TERRITORY|YEAR_ID|                 _id|
+--------------------+------------+-------------+----------------+---------------+-------+--------------------+--------+--------+----+---------------+---------------+-----------+----------------+----------+---------+-----------+-----------+------+---------------+-------+-----+-------+---------+-------+--------------------+
|897 Long Airport ...|   

                                                                                

In [None]:
df.select('PRODUCTLINE').distinct().show()

+----------------+
|     PRODUCTLINE|
+----------------+
|     Motorcycles|
|    Classic Cars|
|Trucks and Buses|
|    Vintage Cars|
|          Planes|
|           Ships|
|          Trains|
+----------------+



In [26]:
import pyspark.sql.functions as f
import pyspark.sql.types as t


df_productlines = df.select('PRODUCTLINE', 'SALES').groupBy('PRODUCTLINE').agg(
    f.count('PRODUCTLINE').alias('Count'),
    f.sum('SALES').cast(t.IntegerType()).alias('Total Sales')
).orderBy('Total Sales', ascending=False)
df_productlines.show()

+----------------+-----+-----------+
|     PRODUCTLINE|Count|Total Sales|
+----------------+-----+-----------+
|    Classic Cars|  967|    3919615|
|    Vintage Cars|  607|    1903150|
|     Motorcycles|  331|    1166388|
|Trucks and Buses|  301|    1127789|
|          Planes|  306|     975003|
|           Ships|  234|     714437|
|          Trains|   77|     226243|
+----------------+-----+-----------+



                                                                                

In [31]:
df.show(1)

+--------------------+------------+----+----------------+---------------+-------+-----------------+--------+--------+----+--------------+---------------+-----------+----------+----------+---------+-----------+-----------+------+---------------+-----+-----+-------+---------+-------+--------------------+
|        ADDRESSLINE1|ADDRESSLINE2|CITY|CONTACTFIRSTNAME|CONTACTLASTNAME|COUNTRY|     CUSTOMERNAME|DEALSIZE|MONTH_ID|MSRP|     ORDERDATE|ORDERLINENUMBER|ORDERNUMBER|     PHONE|POSTALCODE|PRICEEACH|PRODUCTCODE|PRODUCTLINE|QTR_ID|QUANTITYORDERED|SALES|STATE| STATUS|TERRITORY|YEAR_ID|                 _id|
+--------------------+------------+----+----------------+---------------+-------+-----------------+--------+--------+----+--------------+---------------+-----------+----------+----------+---------+-----------+-----------+------+---------------+-----+-----+-------+---------+-------+--------------------+
|897 Long Airport ...|            | NYC|            Kwai|             Yu|    USA|Land of

In [34]:
df.select('COUNTRY').where(f.length(df.COUNTRY) != 3).distinct().show()

+-----------+
|    COUNTRY|
+-----------+
|     France|
|     Norway|
|  Australia|
|    Finland|
|    Austria|
|         UK|
|      Spain|
|     Sweden|
|  Singapore|
|     Canada|
|      Japan|
|      Italy|
|    Denmark|
|    Belgium|
|Philippines|
|    Germany|
|Switzerland|
|    Ireland|
+-----------+



In [None]:
df.select('ORDERNUMBER').distinct().show()

+-----------+
|ORDERNUMBER|
+-----------+
|      10107|
|      10121|
|      10134|
|      10145|
|      10159|
|      10168|
|      10180|
|      10188|
|      10201|
|      10211|
|      10223|
|      10237|
|      10251|
|      10263|
|      10275|
|      10285|
|      10299|
|      10309|
|      10318|
|      10329|
+-----------+
only showing top 20 rows



                                                                                