## Import & Install PySpark

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=dc71a1e0e2dfe742311852689eb640464cbd09f15cddae3132c37e2308e47f2f
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
import pyspark
from pyspark.sql import SparkSession
sc = pyspark.SparkContext('local[*]')

### pyspark.sql.SparkSession Main entry point for DataFrame and SQL functionality.
#### The entry point to programming Spark with the Dataset and DataFrame API.

A SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files. To create a SparkSession, use the following builder pattern:
[Source](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession)

In [3]:
spark = SparkSession.builder.master("local").appName("cs5540").config("spark.some.config.option", "some-value").getOrCreate()

# We can create RDD in two ways:
- parallelize()

- textFile

### parallelize()  to create RDD

In [6]:
big_list = range(1000)
print(type(big_list))

# make an RDD and distripute the data into 2 partitions:
rdd = sc.parallelize(big_list, 2)
print(type(rdd))

# filter Return a new RDD containing only the elements that satisfy a predicate.
# By using the RDD filter() method, that operation occurs in a distributed manner
# across several CPUs or computers.

# Here we filter the odd numbers
odds = rdd.filter(lambda x: x % 2 != 0)
print(type(odds))

# Here we filter the even numbers
evens = rdd.filter(lambda x: x % 2 == 0)
print(type(evens))

# take() is a way to see the contents of your RDD, but only a small subset, by
# pulling that subset of data from the distributed system onto a single machine.
print(odds.take(10))
print(evens.take(10))

<class 'range'>
<class 'pyspark.rdd.PipelinedRDD'>
<class 'pyspark.rdd.PipelinedRDD'>
<class 'pyspark.rdd.PipelinedRDD'>
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


# textFile() to create RDD

In [8]:
txt = sc.textFile('/content/links.txt')
print(type(txt))

# Print the number of lines counted
print(txt.count())

# filter lines with word "python"
python_lines = txt.filter(lambda line: 'python' in line.lower())

# count the number of lines with word "python"
print(python_lines.count())

<class 'pyspark.rdd.RDD'>
28
4


## Lazy Evaluation
Results are not computed right away. Instead, they just remember the transformations applied to some base data set.Spark computes transformations when an action requires a result for the driver program.


## Reading a Tweet JSON File

In [9]:
path_to_input = '/content/tweet.json'
df = spark.read.json(sc.wholeTextFiles(path_to_input).values())

print(type(df))
df.show()

<class 'pyspark.sql.dataframe.DataFrame'>
+------------+-----------+--------------------+--------------------+---------+----+-----------------+-----------------+-----------------------+---------------------+-------------------------+-------------------+-----------------------+-----+-------------+---------+--------------------+--------------------+--------------------+---------+--------------------+
|contributors|coordinates|          created_at|            entities|favorited| geo|               id|           id_str|in_reply_to_screen_name|in_reply_to_status_id|in_reply_to_status_id_str|in_reply_to_user_id|in_reply_to_user_id_str|place|retweet_count|retweeted|    retweeted_status|              source|                text|truncated|                user|
+------------+-----------+--------------------+--------------------+---------+----+-----------------+-----------------+-----------------------+---------------------+-------------------------+-------------------+-----------------------+---

In [10]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("tweet")

q1 = spark.sql('select user.screen_name, user.followers_count, text, retweet_count from tweet')
# Collect (Action) - Return all the elements of the dataset as an array at the driver program.
print(q1.collect())
print(q1.show())


q2 = spark.sql('select created_at, source from tweet')
# Collect (Action) - Return all the elements of the dataset as an array at the driver program.
print(q2.collect())
print(q2.show())

[Row(screen_name='OldGREG85', followers_count=48, text='RT @PostGradProblem: In preparation for the NFL lockout, I will be spending twice as much time analyzing my fantasy baseball team during ...', retweet_count=4)]
+-----------+---------------+--------------------+-------------+
|screen_name|followers_count|                text|retweet_count|
+-----------+---------------+--------------------+-------------+
|  OldGREG85|             48|RT @PostGradProbl...|            4|
+-----------+---------------+--------------------+-------------+

None
[Row(created_at='Sun Apr 03 23:48:36 +0000 2011', source='<a href="http://twitter.com/" rel="nofollow">Twitter for iPhone</a>')]
+--------------------+--------------------+
|          created_at|              source|
+--------------------+--------------------+
|Sun Apr 03 23:48:...|<a href="http://t...|
+--------------------+--------------------+

None
