In [1]:
!pip install -U pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=61249e6544f5073431e82188769998470b269f8a649622d1de582a94bf03a61b
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
print("Using Apache Spark Version", spark.version)

Using Apache Spark Version 3.5.1


In [37]:
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [38]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Intro to Apache Spark") \
    .config("spark.cores.max", "4") \
    .config('spark.executor.memory', '8G') \
    .config('spark.driver.maxResultSize', '8g') \
    .config('spark.kryoserializer.buffer.max', '512m') \
    .config("spark.driver.cores", "4") \
    .getOrCreate()

sc = spark.sparkContext

print("Using Apache Spark Version", spark.version)

Using Apache Spark Version 3.5.1


In [39]:
cb_sdf = spark.read.option("header", "true") \
                   .option("delimiter", ",") \
                   .option("inferSchema", "true") \
                   .csv("/crunchbase_odm_orgs (1).csv")
cb_sdf.printSchema()

root
 |-- uuid: string (nullable = true)
 |-- name: string (nullable = true)
 |-- type: string (nullable = true)
 |-- primary_role: string (nullable = true)
 |-- cb_url: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- homepage_url: string (nullable = true)
 |-- logo_url: string (nullable = true)
 |-- facebook_url: string (nullable = true)
 |-- twitter_url: string (nullable = true)
 |-- linkedin_url: string (nullable = true)
 |-- combined_stock_symbols: string (nullable = true)
 |-- city: string (nullable = true)
 |-- region: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- short_description: string (nullable = true)



In [42]:
cb_sdf = cb_sdf.na.drop(subset=["short_description"])
cb_sdf.count()

1127655

In [46]:
#1. Find all companies with the name that is only two words
filtered_cb_sdf = cb_sdf.filter(cb_sdf.name.rlike("^\w+ \w+$"))
result_df = filtered_cb_sdf.select("name", "city", "region", "country_code")
result_df.show(truncate=False)

+---------------------+----------------+----------+------------+
|name                 |city            |region    |country_code|
+---------------------+----------------+----------+------------+
|Time Warner          |New York        |New York  |USA         |
|Goldman Sachs        |New York        |New York  |USA         |
|Jingle Networks      |New York        |New York  |USA         |
|Hearst Communications|New York        |New York  |USA         |
|Ning Interactive     |Menlo Park      |California|USA         |
|Prosper Marketplace  |San Francisco   |California|USA         |
|Tribune Media        |Chicago         |Illinois  |USA         |
|Aggregate Knowledge  |San Mateo       |California|USA         |
|Zing Systems         |Mountain View   |California|USA         |
|Amie Street          |Long Island City|New York  |USA         |
|Legg Mason           |Baltimore       |Maryland  |USA         |
|Haute Secure         |Seattle         |Washington|USA         |
|Squid Labs           |Al

In [47]:
# Return the number of companies with two word names.
cb_sdf.filter(cb_sdf.name.rlike("^\w+ \w+$")).count()

336764

In [49]:
#2. Find all companies located in the state of California
filtered_cb_sdf_2 = cb_sdf.filter(cb_sdf.region == "California")
result_df_2= filtered_cb_sdf_2.select("name", "city", "region", "country_code")
result_df_2.show(truncate=False)

+---------------------+--------------+----------+------------+
|name                 |city          |region    |country_code|
+---------------------+--------------+----------+------------+
|Zoho                 |Pleasanton    |California|USA         |
|Facebook             |Menlo Park    |California|USA         |
|Accel                |Palo Alto     |California|USA         |
|Omnidrive            |Palo Alto     |California|USA         |
|Geni                 |West Hollywood|California|USA         |
|Flektor              |Culver City   |California|USA         |
|Fox Interactive Media|Beverly Hills |California|USA         |
|Twitter              |San Francisco |California|USA         |
|StumbleUpon          |San Francisco |California|USA         |
|Scribd               |San Francisco |California|USA         |
|Slacker              |San Diego     |California|USA         |
|Lala                 |Palo Alto     |California|USA         |
|Helio                |Los Angeles   |California|USA   

In [50]:
# Return the number of companies located in California.
cb_sdf.filter(cb_sdf.region == "California").count()

94871

In [53]:
#3. Add a "Blog" column to the DataFrame with the row entries set to 1 if the "domain" field contains "blogspot.com", and 0 otherwise.
cb_sdf = cb_sdf.withColumn("Blog",cb_sdf.domain.contains("blogspot.com").astype("int"))
cb_sdf.show()


+--------------------+--------------------+------------+------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+--------------+----------+------------+--------------------+----+
|                uuid|                name|        type|primary_role|              cb_url|         domain|        homepage_url|            logo_url|        facebook_url|         twitter_url|        linkedin_url|combined_stock_symbols|          city|    region|country_code|   short_description|Blog|
+--------------------+--------------------+------------+------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+--------------+----------+------------+--------------------+----+
|e1393508-30ea-8a3...|            Wetpaint|organization|     company|https://www.crunc...|   wetpain

In [55]:
#Show() only the name, location (city, region, country_code) and "Blog" column for companies with the "Blog" field marked as 1
filtered_cb_sdf_3 = cb_sdf.filter(cb_sdf.Blog == 1)
result_df_3 = filtered_cb_sdf_3.select("name", "city", "region", "country_code","Blog")
result_df_3.show(truncate=False)


+--------------------------+-------------+------------+------------+----+
|name                      |city         |region      |country_code|Blog|
+--------------------------+-------------+------------+------------+----+
|Sad Urdu Poetry           |San Antonio  |Texas       |USA         |1   |
|The Tech-Freak            |Sheffield    |Sheffield   |GBR         |1   |
|Ma.Gnolia                 |San Francisco|California  |USA         |1   |
|Dynasty Online            |NULL         |NULL        |NULL        |1   |
|Hire-seo                  |NULL         |NULL        |NULL        |1   |
|YelloYello                |Rijswijk     |Zuid-Holland|NLD         |1   |
|Youtubehiphop             |São Paulo    |Sao Paulo   |BRA         |1   |
|Payday advances           |NULL         |NULL        |NULL        |1   |
|Blog Traffic Exchange     |Menlo Park   |California  |USA         |1   |
|Sirius Forex Trading Group|NULL         |NULL        |NULL        |1   |
|Utilsforge                |Delaware  

In [58]:
# Count the number of companies with "Blog" field marked as 1.
cb_sdf.filter(cb_sdf.Blog == 1).count()

394

In [56]:
#4. Find all companies with names that are palindromes (name reads the same way forward and reverse, e.g. madam) using Spark UDF function.
from pyspark.sql.functions import reverse
filtered_cb_sdf_4 = cb_sdf.filter(cb_sdf.name == reverse(cb_sdf.name))
result_df_4 = filtered_cb_sdf_4.select("name", "city", "region", "country_code")
result_df_4.show(truncate=False)

+------+-------------+--------------+------------+
|name  |city         |region        |country_code|
+------+-------------+--------------+------------+
|KAYAK |Stamford     |Connecticut   |USA         |
|ooVoo |New York     |New York      |USA         |
|63336 |London       |England       |GBR         |
|TipiT |Delft        |Zuid-Holland  |NLD         |
|beweb |Auckland     |Auckland      |NZL         |
|CSC   |Falls Church |Virginia      |USA         |
|CBC   |Ottawa       |Ontario       |CAN         |
|OQO   |San Francisco|California    |USA         |
|SAS   |Cary         |North Carolina|USA         |
|e4e   |Santa Clara  |California    |USA         |
|PHP   |Little Rock  |Arkansas      |USA         |
|ivi   |Moscow       |Moscow City   |RUS         |
|ADDA  |Bangalore    |Karnataka     |IND         |
|izeezi|Chippenham   |Wiltshire     |GBR         |
|siXis |Durham       |North Carolina|USA         |
|STATS |Chicago      |Illinois      |USA         |
|8x8   |San Jose     |Californi

In [57]:
# Count all companies with names that are palindromes
cb_sdf.filter(cb_sdf.name == reverse(cb_sdf.name)).count()

808