In [1]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.ml.feature import NGram
from pyspark.ml.feature import Tokenizer
import pandas 

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
all_data = spark.read.csv("alldata.csv", header = True, inferSchema = True)
companies = spark.read.csv("companies.csv", header = True, inferSchema = True)

In [4]:
# check for repeating columns of all_Data in companies and rename by adding all_data
for all_data_col in all_data.columns:
    if all_data_col in companies.columns:
        all_data = all_data.withColumnRenamed(all_data_col, all_data_col + '_all_data')
        

In [5]:
#join the two datasets
joined_df = companies.join(all_data, companies["company name"] == all_data["company"], "inner")

In [6]:
location = all_data.select('location').dropna()

In [7]:
#split location to get city
city = location.select(F.split('location', ' ')[0].alias('city'))

In [8]:
city = city[city.city.contains(',')]
city.show()

+--------+
|    city|
+--------+
|Atlanta,|
|Atlanta,|
|Atlanta,|
|Atlanta,|
|Atlanta,|
|Atlanta,|
|Atlanta,|
|Atlanta,|
|Atlanta,|
|Atlanta,|
|Atlanta,|
|Atlanta,|
|Atlanta,|
|Atlanta,|
|Atlanta,|
|Atlanta,|
|Atlanta,|
|Atlanta,|
|Atlanta,|
|Atlanta,|
+--------+
only showing top 20 rows



In [9]:
#create a temview
city.createOrReplaceTempView('city')

In [20]:
city_freq = spark.sql("SELECT city, COUNT(city) AS frequency FROM city GROUP BY city ORDER BY frequency ")

In [21]:
joined_df.createOrReplaceTempView('joined_df')

In [22]:
industry_freq = spark.sql("SELECT industry, COUNT(industry) AS frequency FROM joined_df GROUP BY industry")

In [42]:
#Write a function to generate n-grams (unigram & bigram) from a given text/description.
def generateNgrams(ngram_text, column_name):
    tokenizer = Tokenizer(inputCol = column_name, outputCol = 'unigram')
    ngram = NGram(n = 2, inputCol = 'unigram', outputCol = 'ngram')
    unigram = tokenizer.transform(ngram_text)
    bigram = ngram.transform(unigram)
    return bigram.select(column_name, 'frequency', 'ngram')

In [43]:
def createNgramDF():
    city_freq_ngram = generateNgrams(city_freq, 'city')
    companies_ngram = generateNgrams(industry_freq, 'industry')
    return city_freq_ngram , companies_ngram

In [44]:
city_freq_ngram = createNgramDF()[0]
industry_freq_ngram = createNgramDF()[1]

In [45]:
city_freq_ngram.show(100)

+-----------+---------+-----+
|       city|frequency|ngram|
+-----------+---------+-----+
|     Union,|        1|   []|
|  Lynbrook,|        1|   []|
| Fairfield,|        1|   []|
|  Berkeley,|        1|   []|
| Allendale,|        1|   []|
|Parsippany,|        1|   []|
|  Martinez,|        1|   []|
| Manhasset,|        1|   []|
|Burlingame,|        1|   []|
|   Hayward,|        2|   []|
|   Belmont,|        2|   []|
| Manhattan,|        2|   []|
|  Richmond,|        2|   []|
|Emeryville,|        2|   []|
|  Brooklyn,|        3|   []|
|   Alameda,|        4|   []|
|   Boulder,|        8|   []|
|   Oakland,|        9|   []|
|   Redmond,|       17|   []|
| Sunnyvale,|       20|   []|
|    Austin,|       26|   []|
|   Atlanta,|       30|   []|
|   Chicago,|       42|   []|
|Washington,|       46|   []|
|   Seattle,|       65|   []|
|    Boston,|       72|   []|
| Cambridge,|       75|   []|
+-----------+---------+-----+



In [46]:
industry_freq_ngram.show(20)

+--------------------+---------+--------------------+
|            industry|frequency|               ngram|
+--------------------+---------+--------------------+
| they provide inv...|       10|[ they, they prov...|
|   Computer Hardware|        1| [computer hardware]|
| sale or distribu...|       94|[ sale, sale or, ...|
|           Insurance|        1|                  []|
|   Health Care Plans|        1|[health care, car...|
| http://www.ise.com/|        6|                  []|
| services and pro...|       94|[ services, servi...|
| depositary prefe...|       31|[ depositary, dep...|
| exchange-traded ...|       31|[ exchange-traded...|
|Consumer Packaged...|        1|[consumer package...|
| services or tech...|        2|[ services, servi...|
| plus any borrowi...|        1|[ plus, plus any,...|
|     Credit Services|        3|   [credit services]|
|        construction|        3|     [ construction]|
| ""2020 Bonds"")....|        1|[ ""2020, ""2020 ...|
|     Medical Devices|      