<a href="https://colab.research.google.com/github/shivashankar328/pyspark_projects/blob/main/trustpilot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=78a9ae6dd26f1f5ead18dfcfd29ac3ba104173c5fa65eeac04fc815b5f5a996d
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import ArrayType, StringType, StructType, StructField, BooleanType
from pyspark.sql.functions import col, explode, from_json

In [4]:
spark = SparkSession.builder.appName('trustpilot_data').getOrCreate()

In [33]:
df = spark.read.csv('trustpilot_insurance.csv', header=True, inferSchema=True)

In [6]:
df.describe().show()

+-------+--------------------+------------------+--------------------+------------------+--------------------+------------------+------------------+--------------------+--------------------+--------------------+
|summary|      businessUnitId|             stars|     identifyingName|       displayName|             logoUrl|   numberOfReviews|        trustScore|            location|             contact|          categories|
+-------+--------------------+------------------+--------------------+------------------+--------------------+------------------+------------------+--------------------+--------------------+--------------------+
|  count|                  60|                60|                  60|                60|                  54|                60|                60|                  60|                  60|                  60|
|   mean|                NULL| 3.566666666666667|                NULL|              NULL|                NULL| 2770.883333333333|3.5483333333333333|    

In [7]:
df.printSchema()

root
 |-- businessUnitId: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- identifyingName: string (nullable = true)
 |-- displayName: string (nullable = true)
 |-- logoUrl: string (nullable = true)
 |-- numberOfReviews: integer (nullable = true)
 |-- trustScore: double (nullable = true)
 |-- location: string (nullable = true)
 |-- isRecommendedInCategories: boolean (nullable = true)
 |-- contact: string (nullable = true)
 |-- categories: string (nullable = true)



In [34]:
#drop unwanted columns
columns_todrop = ['businessUnitId','stars','identifyingName','logoUrl','isRecommendedInCategories']
df = df.drop(*columns_todrop)
df.show(5)

+--------------------+---------------+----------+--------------------+--------------------+--------------------+
|         displayName|numberOfReviews|trustScore|            location|             contact|          categories|
+--------------------+---------------+----------+--------------------+--------------------+--------------------+
|Allianz Partners USA|          82240|       4.2|{'address': '9950...|{'website': 'http...|[{'categoryId': '...|
|               AARDY|          25453|       4.9|{'address': '1200...|{'website': 'http...|[{'categoryId': '...|
|            QEEQ.COM|          12298|       4.2|{'address': None,...|{'website': 'http...|[{'categoryId': '...|
|                IMG®|          11458|       4.6|{'address': '9200...|{'website': 'http...|[{'categoryId': '...|
|VisitorsCoverage ...|           6146|       4.7|{'address': '2350...|{'website': 'http...|[{'categoryId': '...|
+--------------------+---------------+----------+--------------------+--------------------+-----

In [35]:
# renaming the columns
df = df.withColumnRenamed('_c0', 'id') \
    .withColumnRenamed('displayName', 'companyname') \
    .withColumnRenamed('trustScore', 'rating')
df.show(5)

+--------------------+---------------+------+--------------------+--------------------+--------------------+
|         companyname|numberOfReviews|rating|            location|             contact|          categories|
+--------------------+---------------+------+--------------------+--------------------+--------------------+
|Allianz Partners USA|          82240|   4.2|{'address': '9950...|{'website': 'http...|[{'categoryId': '...|
|               AARDY|          25453|   4.9|{'address': '1200...|{'website': 'http...|[{'categoryId': '...|
|            QEEQ.COM|          12298|   4.2|{'address': None,...|{'website': 'http...|[{'categoryId': '...|
|                IMG®|          11458|   4.6|{'address': '9200...|{'website': 'http...|[{'categoryId': '...|
|VisitorsCoverage ...|           6146|   4.7|{'address': '2350...|{'website': 'http...|[{'categoryId': '...|
+--------------------+---------------+------+--------------------+--------------------+--------------------+
only showing top 5 

In [10]:
#loacrtion column data
df.select('location').show(5)
df.select('location').take(5)

+--------------------+
|            location|
+--------------------+
|{'address': '9950...|
|{'address': '1200...|
|{'address': None,...|
|{'address': '9200...|
|{'address': '2350...|
+--------------------+
only showing top 5 rows



[Row(location="{'address': '9950 Mayland Drive', 'city': 'Richmond', 'zipCode': '23233', 'country': 'United States'}"),
 Row(location="{'address': '1200 South Pine Island Road', 'city': 'Plantation', 'zipCode': 'FL 33324', 'country': 'United States'}"),
 Row(location="{'address': None, 'city': None, 'zipCode': None, 'country': 'United States'}"),
 Row(location="{'address': '9200 Keystone Crossing', 'city': 'Indianapolis', 'zipCode': '46240', 'country': 'United States'}"),
 Row(location="{'address': '2350 Mission College Blvd, STE 1140', 'city': 'Santa Clara', 'zipCode': '95054', 'country': 'United States'}")]

In [36]:
# create new columns and extract address, city, zipcode, country from location column
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import IntegerType
location_schema = StructType([
                  StructField('address', StringType(), True),
                  StructField('city', StringType(), True),
                  StructField('zipCode', StringType(), True),
                  StructField('country', StringType(), True)
                ])
df = df.withColumn('location', regexp_replace(col('location'), "None", 'null'))
df_new = df.withColumn('location_json', from_json(col('location'), location_schema))
df_new.show(5)

+--------------------+---------------+------+--------------------+--------------------+--------------------+--------------------+
|         companyname|numberOfReviews|rating|            location|             contact|          categories|       location_json|
+--------------------+---------------+------+--------------------+--------------------+--------------------+--------------------+
|Allianz Partners USA|          82240|   4.2|{'address': '9950...|{'website': 'http...|[{'categoryId': '...|{9950 Mayland Dri...|
|               AARDY|          25453|   4.9|{'address': '1200...|{'website': 'http...|[{'categoryId': '...|{1200 South Pine ...|
|            QEEQ.COM|          12298|   4.2|{'address': null,...|{'website': 'http...|[{'categoryId': '...|{NULL, NULL, NULL...|
|                IMG®|          11458|   4.6|{'address': '9200...|{'website': 'http...|[{'categoryId': '...|{9200 Keystone Cr...|
|VisitorsCoverage ...|           6146|   4.7|{'address': '2350...|{'website': 'http...|[{'

In [37]:
df_new.select('location_json').take(5)

[Row(location_json=Row(address='9950 Mayland Drive', city='Richmond', zipCode='23233', country='United States')),
 Row(location_json=Row(address='1200 South Pine Island Road', city='Plantation', zipCode='FL 33324', country='United States')),
 Row(location_json=Row(address=None, city=None, zipCode=None, country='United States')),
 Row(location_json=Row(address='9200 Keystone Crossing', city='Indianapolis', zipCode='46240', country='United States')),
 Row(location_json=Row(address='2350 Mission College Blvd, STE 1140', city='Santa Clara', zipCode='95054', country='United States'))]

In [38]:
#unzip the location_json data
df_new = df_new.withColumn('address', df_new['location_json'].address)\
    .withColumn('city', df_new['location_json'].city)\
    .withColumn('zipcode', df_new['location_json'].zipCode)\
    .withColumn('country', df_new['location_json'].country)
df_new.show(5)

+--------------------+---------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------+-------------+
|         companyname|numberOfReviews|rating|            location|             contact|          categories|       location_json|             address|        city| zipcode|      country|
+--------------------+---------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------+-------------+
|Allianz Partners USA|          82240|   4.2|{'address': '9950...|{'website': 'http...|[{'categoryId': '...|{9950 Mayland Dri...|  9950 Mayland Drive|    Richmond|   23233|United States|
|               AARDY|          25453|   4.9|{'address': '1200...|{'website': 'http...|[{'categoryId': '...|{1200 South Pine ...|1200 South Pine I...|  Plantation|FL 33324|United States|
|            QEEQ.COM|          12298|   4.2|{'address': null,...

In [39]:
#method 2
from pyspark.sql.functions import when
df_new = df_new.withColumn('address', when(col('location_json').getItem('address').isNull(), 'N/A').otherwise(col('location_json').getItem('address')))\
    .withColumn('city', when(col('location_json').getItem('city').isNull(), 'N/A').otherwise(col('location_json').getItem('city')))\
    .withColumn('zipcode', when(col('location_json').getItem('zipCode').isNull(), 'N/A').otherwise(col('location_json').getItem('zipCode')))\
    .withColumn('country', when(col('location_json').getItem('country').isNull(), 'N/A').otherwise(col('location_json').getItem('country')))
df_new.show(5)

+--------------------+---------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------+-------------+
|         companyname|numberOfReviews|rating|            location|             contact|          categories|       location_json|             address|        city| zipcode|      country|
+--------------------+---------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------+-------------+
|Allianz Partners USA|          82240|   4.2|{'address': '9950...|{'website': 'http...|[{'categoryId': '...|{9950 Mayland Dri...|  9950 Mayland Drive|    Richmond|   23233|United States|
|               AARDY|          25453|   4.9|{'address': '1200...|{'website': 'http...|[{'categoryId': '...|{1200 South Pine ...|1200 South Pine I...|  Plantation|FL 33324|United States|
|            QEEQ.COM|          12298|   4.2|{'address': null,...

In [40]:
#contact details
df_new.select('contact').take(5)
# df_new.select('contact').dtypes

[Row(contact="{'website': 'http://www.allianztravelinsurance.com', 'email': 'help.team@allianzassistance.com', 'phone': '1-866-884-3556'}"),
 Row(contact="{'website': 'https://www.aardy.com/', 'email': 'Info@Aardy.com', 'phone': '6504926298'}"),
 Row(contact="{'website': 'http://www.qeeq.com', 'email': 'support@qeeq.com', 'phone': None}"),
 Row(contact="{'website': 'https://imglobal.com', 'email': 'ratingsandreviews@imglobal.com', 'phone': '1-317-655-4500'}"),
 Row(contact="{'website': 'https://www.VisitorsCoverage.com', 'email': 'support@visitorscoverage.com', 'phone': '1.866.384.9104'}")]

In [41]:
# create new columns website, email, phonenumber
contact_schema = StructType([
                  StructField('website', StringType(), True),
                  StructField('email', StringType(), True),
                  StructField('phone', StringType(), True)
                ])
df_new = df_new.withColumn('contact', regexp_replace(col('contact'), "None", 'null'))
df_new = df_new.withColumn('contact_json', from_json(col('contact'), contact_schema))
df_new.show(5)

+--------------------+---------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------+-------------+--------------------+
|         companyname|numberOfReviews|rating|            location|             contact|          categories|       location_json|             address|        city| zipcode|      country|        contact_json|
+--------------------+---------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------+-------------+--------------------+
|Allianz Partners USA|          82240|   4.2|{'address': '9950...|{'website': 'http...|[{'categoryId': '...|{9950 Mayland Dri...|  9950 Mayland Drive|    Richmond|   23233|United States|{http://www.allia...|
|               AARDY|          25453|   4.9|{'address': '1200...|{'website': 'http...|[{'categoryId': '...|{1200 South Pine ...|1200 South Pine I...|  Plantation|FL 33

In [42]:
df_new.select('contact_json').take(5)
# df_new.select('contact_json').dtypes

[Row(contact_json=Row(website='http://www.allianztravelinsurance.com', email='help.team@allianzassistance.com', phone='1-866-884-3556')),
 Row(contact_json=Row(website='https://www.aardy.com/', email='Info@Aardy.com', phone='6504926298')),
 Row(contact_json=Row(website='http://www.qeeq.com', email='support@qeeq.com', phone=None)),
 Row(contact_json=Row(website='https://imglobal.com', email='ratingsandreviews@imglobal.com', phone='1-317-655-4500')),
 Row(contact_json=Row(website='https://www.VisitorsCoverage.com', email='support@visitorscoverage.com', phone='1.866.384.9104'))]

In [71]:
df_new = df_new.withColumn('website', df_new['contact_json'].website)\
    .withColumn('email', df_new['contact_json'].email)\
    .withColumn('phonenumber', df_new['contact_json'].phone)
df_new.show(5)

+--------------------+---------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------+-------------+--------------------+--------------------+--------------------+--------------+--------------------+-----------------+
|         companyname|numberOfReviews|rating|            location|             contact|          categories|       location_json|             address|        city| zipcode|      country|        contact_json|             website|               email|   phonenumber|  cleaned_categories|parsed_categories|
+--------------------+---------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------+-------------+--------------------+--------------------+--------------------+--------------+--------------------+-----------------+
|Allianz Partners USA|          82240|   4.2|{'address': '9950...|{'website': 'http...|[

In [25]:
df_new.select('website', 'email', 'phonenumber').show(5)

+--------------------+--------------------+--------------+
|             website|               email|   phonenumber|
+--------------------+--------------------+--------------+
|http://www.allian...|help.team@allianz...|1-866-884-3556|
|https://www.aardy...|      Info@Aardy.com|    6504926298|
| http://www.qeeq.com|    support@qeeq.com|          NULL|
|https://imglobal.com|ratingsandreviews...|1-317-655-4500|
|https://www.Visit...|support@visitorsc...|1.866.384.9104|
+--------------------+--------------------+--------------+
only showing top 5 rows



In [72]:
df_new = df_new.withColumn("email", when(col("email").isNull(), 'N/A').otherwise(col("email")))
df_new = df_new.withColumn("phonenumber", when(col("phonenumber").isNull(), 0).otherwise(col("phonenumber")))
df_new = df_new.withColumn("website", when(col("website").isNull(), 'N/A').otherwise(col("website")))

In [73]:
df_new.select('website', 'email', 'phonenumber').show(5)

+--------------------+--------------------+--------------+
|             website|               email|   phonenumber|
+--------------------+--------------------+--------------+
|http://www.allian...|help.team@allianz...|1-866-884-3556|
|https://www.aardy...|      Info@Aardy.com|    6504926298|
| http://www.qeeq.com|    support@qeeq.com|             0|
|https://imglobal.com|ratingsandreviews...|1-317-655-4500|
|https://www.Visit...|support@visitorsc...|1.866.384.9104|
+--------------------+--------------------+--------------+
only showing top 5 rows



In [74]:
# prompt: how to clean the phonenumber column

import pyspark.sql.functions as F

# Remove all non-numeric characters from the phone number column
df_new = df_new.withColumn('phonenumber', F.regexp_replace('phonenumber', '[^0-9]', ''))

# # Remove leading zeros from the phone number column
df_new = df_new.withColumn('phonenumber', F.regexp_replace('phonenumber', '^1+', ''))

# Format the phone number column to a standard format
# df_new = df_new.withColumn('phonenumber', F.format_string('%s-%s-%s-',
#                                             F.substring('phonenumber', 1, 3),
#                                             F.substring('phonenumber', 4, 3),
#                                             F.substring('phonenumber', 7, 4)))

df_new = df_new.withColumn('phonenumber',
                           when(F.length(col('phonenumber')) == 10, F.concat(F.lit('1'), col('phonenumber')))
                           .otherwise(col('phonenumber')))


# Show the cleaned phone number column
df_new.select('phonenumber').show()


+-----------+
|phonenumber|
+-----------+
|18668843556|
|16504926298|
|          0|
|13176554500|
|18663849104|
|18003350611|
|16503976592|
|          0|
|18006993845|
|18006394727|
|          0|
|          0|
|13057674008|
|18886769977|
|          0|
|18003449540|
|          0|
|18662660505|
|          0|
|18006052282|
+-----------+
only showing top 20 rows



In [54]:
# schema = StructType([StructField("category", ArrayType(
#     StructType([
#         StructField("categoryId", StringType(), nullable=True),
#         StructField("displayName", StringType(), nullable=True),
#         StructField("isPredicted", BooleanType(), nullable=True)
#     ])
# ), True)])

In [55]:
# Define the schema for the JSON data
schema = ArrayType(StructType([
    StructField("categoryId", StringType(), True),
    StructField("displayName", StringType(), True),
    StructField("isPredicted", BooleanType(), True)
]))

In [57]:
from pyspark.sql.functions import col, from_json, udf, regexp_replace, explode, concat_ws, collect_set
# Define a UDF to clean the JSON strings
@udf(returnType=StringType())
def clean_json(json_str):
    return json_str.replace("'", "\"").replace("|", ",")

# Apply the UDF to clean the JSON strings
df_new = df_new.withColumn("cleaned_categories", clean_json(col("categories")))


In [58]:
df_new.show(5)


+--------------------+---------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------+-------------+--------------------+--------------------+--------------------+--------------+--------------------+
|         companyname|numberOfReviews|rating|            location|             contact|          categories|       location_json|             address|        city| zipcode|      country|        contact_json|             website|               email|   phonenumber|  cleaned_categories|
+--------------------+---------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------+-------------+--------------------+--------------------+--------------------+--------------+--------------------+
|Allianz Partners USA|          82240|   4.2|{'address': '9950...|{'website': 'http...|[{'categoryId': '...|{9950 Mayland Dri...|  9950 Maylan

In [59]:
df_new.select("cleaned_categories").show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|cleaned_categories                                                                                                                                                                                                                                                                                                                                                                            

In [60]:
cleaned_json_strings = df_new.select("cleaned_categories").rdd.map(lambda row: row[0]).collect()
for json_str in cleaned_json_strings:
    print(json_str)

[{"categoryId": "financial_consultant", "displayName": "Financial Consultant", "isPredicted": False}, {"categoryId": "insurance_agency", "displayName": "Insurance Agency", "isPredicted": False}, {"categoryId": "travel_insurance_company", "displayName": "Travel Insurance Company", "isPredicted": False}, {"categoryId": "travel_agency", "displayName": "Travel Agency", "isPredicted": False}]
[{"categoryId": "financial_consultant", "displayName": "Financial Consultant", "isPredicted": False}, {"categoryId": "travel_agency", "displayName": "Travel Agency", "isPredicted": False}, {"categoryId": "insurance_agency", "displayName": "Insurance Agency", "isPredicted": False}, {"categoryId": "travel_insurance_company", "displayName": "Travel Insurance Company", "isPredicted": False}]
[{"categoryId": "tourist_attraction", "displayName": "Tourist Attraction", "isPredicted": False}, {"categoryId": "car_rental_agency", "displayName": "Car Rental Agency", "isPredicted": False}, {"categoryId": "travel_in

In [61]:
# Parse the cleaned JSON data
df_new = df_new.withColumn("parsed_categories", from_json(col("cleaned_categories"), schema))

# Show the parsed data to inspect if parsing worked correctly
df_new.select("cleaned_categories", "parsed_categories").show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+
|cleaned_categories                                                                                                                                                                                                                                                                                                                                                          

In [62]:
df_new.select('parsed_categories').show(5)

+-----------------+
|parsed_categories|
+-----------------+
|             NULL|
|             NULL|
|             NULL|
|             NULL|
|             NULL|
+-----------------+
only showing top 5 rows



In [75]:
df_new = df_new.drop( 'location', 'location_json', 'contact', 'contact_json', 'parsed_categories')
df_new.show(5)

+--------------------+---------------+------+--------------------+--------------------+------------+--------+-------------+--------------------+--------------------+-----------+--------------------+
|         companyname|numberOfReviews|rating|          categories|             address|        city| zipcode|      country|             website|               email|phonenumber|  cleaned_categories|
+--------------------+---------------+------+--------------------+--------------------+------------+--------+-------------+--------------------+--------------------+-----------+--------------------+
|Allianz Partners USA|          82240|   4.2|[{'categoryId': '...|  9950 Mayland Drive|    Richmond|   23233|United States|http://www.allian...|help.team@allianz...|18668843556|[{"categoryId": "...|
|               AARDY|          25453|   4.9|[{'categoryId': '...|1200 South Pine I...|  Plantation|FL 33324|United States|https://www.aardy...|      Info@Aardy.com|16504926298|[{"categoryId": "...|
|    

In [None]:
#cleaned_categories is pending ...