In [3]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 51 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 49.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=caaae45303601eb1a0a54279c85efd8f3e48dea27b17d99e7cb83f0ba658e9ba
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


# Data Application Programming

This exercise will involve working with a Spark RDD dataset of forum posts.

Each forum post has various attributes, such as the author, URL of the post, the title of the post, number of points the post have, when the post was created, and so on.

# Setup

You will need pyspark installed on your machine.

You can then run the lines below to start a new Spark context and load the dataset:

In [69]:
import json
from pyspark import SparkContext

sc = SparkContext.getOrCreate()
print(sc)
print("Ready to go!")

dataset_json = sc.textFile(r"/content/drive/MyDrive/test1 (5 questions) - python/HNStories.json")
dataset = dataset_json.map(lambda x: json.loads(x))
dataset.persist()

<SparkContext master=local[*] appName=pyspark-shell>
Ready to go!


PythonRDD[10] at RDD at PythonRDD.scala:53

In [70]:
type(dataset)

pyspark.rdd.PipelinedRDD

## Q1 [5 marks]
## Counting elements in a Spark dataset

Write a function which returns the number of elements in a dataset loaded on Spark. 

This function should take one parameter named `dataset`, of type Spark RDD.

It should return the number of elements in the Spark RDD.

In [71]:
def count_elements(dataset):
  return dataset.count()

print("The Count of elements in a Dataset is :",count_elements(dataset))

The Count of elements in a Dataset is : 166725


## Q2 [5 marks]
## Getting the first element

Write a function which returns the first element of a dataset loaded on Spark.

This function should take one parameter named `dataset`, of type Spark RDD.

It should return the first element of the Spark RDD.

In [72]:
def first_record(dataset):
  return dataset.collect()[0]

print(first_record(dataset))

{'author': 'TuxLyn', 'created_at': '2014-05-29T08:25:40Z', 'created_at_i': 1401351940, 'num_comments': 0, 'objectID': '7815290', 'points': 1, 'title': 'DuckDuckGo Settings', 'url': 'https://duckduckgo.com/settings'}


## Q3 [10 marks]
## Getting all the attributes of an element

Each element of a Spark RDD is a dictionary of key:value pairs.

Write a function which finds all of the *unique* attributes used throughout a Spark RDD.

This function should take one parameter named `dataset`, of type Spark RDD.

It should return a list.

In [73]:
def unique_attributes(dataset):
  import pandas as pd                            # Importing the pandas library
  
  df = pd.DataFrame(dataset.collect())           # Converting the Spark RDD to DataFrame
  return list(df.objectID.unique())              # Returning the Unique ObjectID values

print(unique_attributes(dataset))

['7815290', '7815237', '7815222', '7815036', '7814937', '7814806', '7814688', '7814526', '7814194', '7814023', '7814020', '7813869', '7813798', '7813749', '7813722', '7813661', '7813593', '7813592', '7813543', '7813432', '7813404', '7813196', '7813192', '7813144', '7813070', '7812953', '7812715', '7812677', '7812645', '7812586', '7812380', '7812348', '7812341', '7812159', '7812099', '7812066', '7811987', '7811974', '7811964', '7811931', '7811835', '7811542', '7811533', '7811499', '7811466', '7811452', '7811288', '7811176', '7811150', '7811145', '7811107', '7811072', '7811008', '7810878', '7810784', '7810313', '7810300', '7810023', '7809948', '7809921', '7809782', '7809726', '7809649', '7809587', '7809551', '7809524', '7809421', '7809409', '7809374', '7809303', '7809252', '7809219', '7809175', '7809153', '7809126', '7809098', '7809079', '7809055', '7808965', '7808906', '7808825', '7808815', '7808754', '7808726', '7808583', '7808547', '7808441', '7808439', '7808375', '7808330', '7808232'

## Q4 [10 marks]
## Finding the earliest timestamp of a dataset - 10 marks

The machine-readable timestamp of an element of a Spark RDD is stored as the attribute `created_at_i`.

Write a function which finds the minimum (i.e. the earliest) timestamp of a Spark RDD.

This function should take one parameter named `dataset`, of type Spark RDD.

It should return a more human-readable`datetime` object.

(A helper function is provided for converting the Spark RDD timestamps to a `datetime` object.)

In [74]:
from datetime import datetime as dt

def extract_time(timestamp):
    return dt.utcfromtimestamp(timestamp)    # Converting the timestamp to Date-Day-Time

# Your code here
def mintimestamp(dataset):
  import pandas as pd

  df = pd.DataFrame(dataset.collect())       # Converting the Spark RDD to DataFrame 
  ear_time = min(df.created_at_i)            # Taking down the minimum value from the column "created_at_i"
  return extract_time(ear_time)              # Passing the minimum value to the extract_time function

print(mintimestamp(dataset))

2006-10-09 18:30:28


## Q5 [20 marks]
## Calculating the proportion of successful posts per author - 20 marks

A post is successful if it has strictly more than 200 points.

Write a function to find the proportion of posts in the Spark RDD dataset which are successful, for each author in the dataset.

This function should take one parameter named `dataset`, of type Spark RDD.

It should return a Spark RDD showing the proportion of successful posts per author.

Note: If an entry in the dataset has no value for author, use the string `unknown` instead.

In [75]:
def successful_ratio(dataset):
  import pandas as pd

  df = pd.DataFrame(dataset.collect())                   # Converting the timestamp to Date-Day-Time
  df1 = df[df['points'] > 200]                           # Extracting the Data whi has points greater than 200
  author_total = df.author.value_counts().to_dict()      # Total author and their count stored in a dictionary
  author_success = df1.author.value_counts().to_dict()   # Successful Post of authors and their count stored in a dict

  a = {}                                                 # Empty Dictionary
  for key,value in author_success.items():               # Iterating through the successful authors Post
    
    # Item is stored in the format of {Author_Name : "Successful_Post : Total_Post"}
    
    a[key] = f"{value} : {author_total[key]}"
  rdd = sc.parallelize([a])                              # Converting a Dictionary to Spark RDD
  return rdd

print(successful_ratio(dataset))
print(successful_ratio(dataset).collect())

ParallelCollectionRDD[12] at readRDDFromFile at PythonRDD.scala:274
[{'pg': '11 : 79', 'llambda': '11 : 320', 'ColinWright': '7 : 411', 'Libertatea': '7 : 228', 'johns': '5 : 56', 'cperciva': '5 : 26', 'whoishiring': '5 : 13', 'jashkenas': '4 : 42', 'ssclafani': '4 : 134', 'petercooper': '4 : 50', 'mtgx': '4 : 158', 'bpierre': '3 : 50', 'cwan': '3 : 686', 'robin_reala': '3 : 45', 'swombat': '3 : 51', 'danso': '3 : 278', 'dave1010uk': '3 : 22', 'pascal07': '3 : 24', 'sahillavingia': '3 : 71', 'tanoku': '3 : 3', 'thomholwerda': '3 : 6', 'gruseom': '3 : 76', 'soundsop': '3 : 112', 'vyrotek': '3 : 28', 'olivercameron': '3 : 20', 'kn0thing': '2 : 23', 'lionhearted': '2 : 35', 'johnnytee': '2 : 9', 'CWIZO': '2 : 4', 'MaysonL': '2 : 99', 'pitdesi': '2 : 70', 'bjonathan': '2 : 104', 'davidw': '2 : 92', 'uladzislau': '2 : 104', 'ck2': '2 : 60', 'mootothemax': '2 : 31', 'ugh': '2 : 12', 'jaf12duke': '2 : 19', 'timf': '2 : 74', 'friggeri': '2 : 11', 'sasvari': '2 : 32', 'yummyfajitas': '2 : 53', 