In [1]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

In [2]:
spark = SparkSession.builder \
    .appName("mongo-spark-postman-pipeline") \
    .getOrCreate()

In [3]:
df = spark.read.csv('products.csv', inferSchema=True, header=True)
df.printSchema()
df.count()

root
 |-- name: string (nullable = true)
 |-- sku: string (nullable = true)
 |-- description: string (nullable = true)



861686

In [4]:
df.createOrReplaceTempView("products")

In [5]:
df = spark.sql("SELECT distinct * from products where sku is not null and description is not null")

In [6]:
df.count()

500000

In [7]:
df.show()

+------------------+-------------------+--------------------+
|              name|                sku|         description|
+------------------+-------------------+--------------------+
|    Mary Rodriguez| hand-couple-manage|Senior word socia...|
|    Jose Henderson| together-table-oil|Apply girl treatm...|
|    Karen Villegas|     child-somebody|Every tell serve....|
|      Olivia Lynch|forget-matter-avoid|Perhaps environme...|
|     Whitney Wiley|    side-blue-dream|Quickly short soc...|
|  Brittany Johnson|        east-pretty|Indicate view sim...|
|       Paul Morris|    radio-window-us|Society month sho...|
|   Jason Patterson|   night-art-be-act|Entire around pla...|
|      Kiara Gentry|   compare-politics|Air my kind staff...|
| William Hernandez|    skin-should-old|Stock support nee...|
|      Jason Osborn|      tv-close-next|Talk view rate ki...|
| Meagan Mccullough|  woman-environment|Stay above task. ...|
|   Sabrina Mullins|life-responsibility|Computer relation...|
|       

In [8]:
df.write \
    .format("mongo") \
    .mode('ignore') \
    .option("uri", "mongodb://localhost:27017/postman.products") \
    .option("database", "postman") \
    .option("collection", "products") \
    .save()

In [9]:
no_of_products = spark.sql("SELECT name, count(name) as no_of_products FROM products group by name having count(name)>1 order by 2 desc")

In [10]:
print(no_of_products.count(), "unique products")

84941 unique products


In [11]:
no_of_products.show()

+-----------------+--------------+
|             name|no_of_products|
+-----------------+--------------+
|    Michael Smith|           247|
|  Michael Johnson|           187|
|     Robert Smith|           167|
|Christopher Smith|           159|
|      David Smith|           158|
| Michael Williams|           157|
|       John Smith|           157|
|      James Smith|           152|
|   Jennifer Smith|           151|
|    Michael Brown|           148|
|    David Johnson|           138|
| Jennifer Johnson|           131|
|     John Johnson|           123|
|    James Johnson|           122|
|    Michael Jones|           113|
|   Michael Miller|           110|
|   David Williams|           108|
|   Robert Johnson|           104|
|      David Brown|           103|
|     Joseph Smith|           103|
+-----------------+--------------+
only showing top 20 rows



In [12]:
no_of_products.write \
    .format("mongo") \
    .mode('ignore') \
    .option("uri", "mongodb://localhost:27017/postman.no_of_products") \
    .option("database", "postman") \
    .option("collection", "no_of_products") \
    .save()

In [13]:
# df_agg = df.groupBy(df['sku']).agg(F.collect_list(F.to_json(F.struct(df['name'], df['description']))).alias('name_description'))
df_agg = df.groupBy(F.col("sku")).agg(F.collect_list(F.struct(F.col("name"),F.col("description"))).alias("name_description"))

In [14]:
df_agg.printSchema()

root
 |-- sku: string (nullable = true)
 |-- name_description: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- description: string (nullable = true)



In [15]:
df_agg.count()

466693

In [16]:
df_agg.show()

+-------------------+--------------------+
|                sku|    name_description|
+-------------------+--------------------+
|           a-effort|[[Toni Sanchez, P...|
|  a-loss-foot-quite|[[Mark Garcia, Tr...|
|      a-matter-seem|[[Matthew Thompso...|
|  ability-interview|[[Jaime Martinez,...|
| ability-none-carry|[[Stephen Roberts...|
|       able-between|[[Morgan Deleon, ...|
|    able-bring-none|[[Amanda Rogers, ...|
|     able-much-seek|[[Brian Mata, Rai...|
|about-each-as-bring|[[David Mason, A ...|
|    accept-congress|[[Kristi Crosby, ...|
|accept-enter-across|[[Sydney Gallaghe...|
| accept-learn-white|[[Joseph Solomon,...|
| according-painting|[[Christopher Coo...|
|account-reduce-yeah|[[Christina Chang...|
|across-able-quality|[[Thomas Baker, P...|
|across-finally-food|[[Heidi Hall, Wil...|
|across-partner-list|[[Cody Todd, Nece...|
|across-teacher-thus|[[Matthew Sharp, ...|
|    act-song-before|[[Angela King, Te...|
|   action-our-group|[[David Li, Court...|
+----------

In [24]:
df_agg.write \
    .format("mongo") \
    .mode('ignore') \
    .option("uri", "mongodb://localhost:27017/postman.products1") \
    .option("database", "postman") \
    .option("collection", "products1") \
    .save()