In [48]:
#!pip install pyspark

In [105]:
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext() 
config = sc.getConf()
config.set('spark.cores.max','4')
config.set('spark.executor.memory', '8G')
config.set('spark.driver.maxResultSize', '8g')
config.set('spark.kryoserializer.buffer.max', '512m')
config.set("spark.driver.cores", "4")

sc.stop()

In [106]:
sc = SparkContext(conf = config) 
sqlContext = SQLContext(sc)
spark = SparkSession(sc)



In [107]:
news = spark.read.option("multiline","true") \
      .json("news.json")
news.columns

['author',
 'company',
 'content',
 'description',
 'publishedAt',
 'source',
 'title',
 'url',
 'urlToImage']

In [108]:
news.select('company', 'source', 'publishedAt', 'title', 'description').show()  

+-------+--------------------+--------------------+--------------------+--------------------+
|company|              source|         publishedAt|               title|         description|
+-------+--------------------+--------------------+--------------------+--------------------+
|     3M|{bbc-news, BBC News}|2022-04-14T23:39:47Z|The book that san...|A jewel-encrusted...|
|     3M|  {reuters, Reuters}|2022-04-07T18:08:00Z|Judge blasts 3M's...|The federal judge...|
|     3M|  {reuters, Reuters}|2022-04-08T21:27:00Z|3M benchslapped f...|Critics of consol...|
|     3M|  {reuters, Reuters}|2022-03-25T23:11:00Z|Jury says 3M owes...|A federal jury on...|
|     3M|  {reuters, Reuters}|2022-04-07T11:18:00Z|Mexico labor refo...|A top U.S. labor ...|
|     3M|  {reuters, Reuters}|2022-03-30T20:54:00Z|Factbox: Recessio...|The closely watch...|
|     3M|{null, The Guardian}|2022-04-13T11:49:52Z|China’s lockdowns...|Organisation lowe...|
|     3M|{null, The Guardian}|2022-04-12T18:30:30Z|The Great

In [109]:
# the total number of news
news.count()

9950

In [110]:
# how many companies are included
companycount=news.select(countDistinct("company"))
companycount.show()

+-----------------------+
|count(DISTINCT company)|
+-----------------------+
|                    505|
+-----------------------+



In [111]:
# find how many companies have less than 20 pieces of news
from pyspark.sql.functions import *

def count_col_dups(df, col_name):
    df_dupes = df.groupBy(col_name) \
                 .agg(count(col_name).alias("count")) \
                 .filter(col('count') < 20) 
    return df_dupes

dupes = count_col_dups(news, 'company')
dupes.show(truncate=False)
dupes.count()

+-------------------------+-----+
|company                  |count|
+-------------------------+-----+
|KLA Corporation          |19   |
|Allstate Corp            |12   |
|Duke Realty Corp         |11   |
|IDEX Corporation         |19   |
|Under Armour (Class C)   |9    |
|Ameren Corp              |7    |
|Healthpeak Properties    |14   |
|LKQ Corporation          |5    |
|Loews Corporation        |9    |
|Howmet Aerospace         |19   |
|Fox Corporation (Class B)|18   |
|TransDigm Group          |17   |
|W. W. Grainger           |11   |
|Martin Marietta Materials|7    |
|Copart                   |14   |
|JM Smucker               |3    |
|Kimco Realty             |18   |
|Cintas Corporation       |17   |
|W. R. Berkley Corporation|1    |
+-------------------------+-----+



19

In [113]:
# create a new column that only have publish date no exact time
news1 = news.withColumn('publish date', substring('publishedAt', 1,10))
news1.select('company', 'publishedAt','publish date').show()

+-------+--------------------+------------+
|company|         publishedAt|publish date|
+-------+--------------------+------------+
|     3M|2022-04-14T23:39:47Z|  2022-04-14|
|     3M|2022-04-07T18:08:00Z|  2022-04-07|
|     3M|2022-04-08T21:27:00Z|  2022-04-08|
|     3M|2022-03-25T23:11:00Z|  2022-03-25|
|     3M|2022-04-07T11:18:00Z|  2022-04-07|
|     3M|2022-03-30T20:54:00Z|  2022-03-30|
|     3M|2022-04-13T11:49:52Z|  2022-04-13|
|     3M|2022-04-12T18:30:30Z|  2022-04-12|
|     3M|2022-04-14T17:30:08Z|  2022-04-14|
|     3M|2022-04-15T08:55:27Z|  2022-04-15|
|     3M|2022-04-10T12:06:39Z|  2022-04-10|
|     3M|2022-04-10T05:20:49Z|  2022-04-10|
|     3M|2022-04-13T21:49:13Z|  2022-04-13|
|     3M|2022-04-02T16:38:15Z|  2022-04-02|
|     3M|2022-03-25T06:00:20Z|  2022-03-25|
|     3M|2022-04-10T04:00:09Z|  2022-04-10|
|     3M|2022-04-02T06:00:43Z|  2022-04-02|
|     3M|2022-04-08T21:56:05Z|  2022-04-08|
|     3M|2022-04-13T05:00:44Z|  2022-04-13|
|     3M|2022-03-22T18:55:51Z|  

In [114]:
# Since API also sourced the news that have company name in description and content not in the title, 
# and most of these articles have less relevancy with the company, so we only select the news that include 
# company name in the title
from pyspark.sql.functions import *
query1=news1.filter(col("title").contains(news.company))
query1.select('company','title','publish date').show()
query1.count()

+--------------------+--------------------+------------+
|             company|               title|publish date|
+--------------------+--------------------+------------+
|                  3M|Judge blasts 3M's...|  2022-04-07|
|                  3M|3M benchslapped f...|  2022-04-08|
|                  3M|Jury says 3M owes...|  2022-03-25|
| Abbott Laboratories|Abbott Laboratori...|  2022-04-20|
| Abbott Laboratories|3 Things About Ab...|  2022-04-19|
|              AbbVie|Texas Demands Inf...|  2022-03-24|
|              AbbVie|Is AbbVie Stock a...|  2022-03-22|
|             Abiomed|Abiomed Fourth Qu...|  2022-04-08|
|             Abiomed|Buy the Dip: Digi...|  2022-04-04|
|             Abiomed|Abiomed slips as ...|  2022-04-04|
|             Abiomed|Abiomed, AtriCure...|  2022-04-19|
|             Abiomed|Abiomed Impella 5...|  2022-04-06|
| Activision Blizzard|Activision Blizza...|  2022-03-24|
| Activision Blizzard|Governor Newsom f...|  2022-04-13|
| Activision Blizzard|Activisio

889

In [184]:
# Then, we can sort the news based on relevancy, return the news that have company name 
# in title first, in description second, and in content last
from pyspark.sql.functions import when
query2 = news1.withColumn("relevancy", when(col("title").contains(news.company), 1)
                                            .when(col("description").contains(news.company), 2)
                                            .otherwise(3))
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
query3 = Window.partitionBy("company").orderBy("relevancy")
query4 = query2.withColumn("order", row_number().over(query3))\
          .select('order', 'company','title', 'publish date', 'description', 'url')

In [185]:
# We can find the news in the order of relevancy for each company by changing the company name
# eg:
query4.filter(query4.company=="Apple").show()

+-----+-------+--------------------+------------+--------------------+--------------------+
|order|company|               title|publish date|         description|                 url|
+-----+-------+--------------------+------------+--------------------+--------------------+
|    1|  Apple|Every Store You C...|  2022-04-20|I use Apple Pay f...|https://lifehacke...|
|    2|  Apple|Apple reportedly ...|  2022-03-30|Apple is well-kno...|https://www.engad...|
|    3|  Apple|Apple settles voi...|  2022-03-22|Following years o...|https://www.engad...|
|    4|  Apple|Apple Studio Disp...|  2022-04-11|Some owners of th...|https://www.engad...|
|    5|  Apple|Apple is reported...|  2022-03-24|Apple's iPhones a...|https://www.engad...|
|    6|  Apple|Apple signs an ex...|  2022-04-13|Apple\r\n is look...|https://www.engad...|
|    7|  Apple|Apple buys UK-bas...|  2022-03-23|Apple\r\n has acq...|https://www.engad...|
|    8|  Apple|Apple may release...|  2022-03-27|Bloomberg reporte...|https://ww