First step, in any Apache programming is to create a SparkContext. SparkContext is required when we want to execute operations in a cluster. SparkContext tells Spark how and where to access a cluster. And the first step is to connect with Apache Cluster. If you are using Spark Shell, you will notice that it is already created. Otherwise, we can create the SparkContext by importing, initializing and providing the configuration settings. For example,

In [1]:
from pyspark import SparkContext
sc = SparkContext()

In [2]:
#Creating a Sqlcontext 
from pyspark import SQLContext
sqlContext=SQLContext(sc)

Creating DataFrame from RDD
I am following these steps for creating a DataFrame from list of tuples:

Create a list of tuples. Each tuple contains name of a person with age.
Create a RDD from the list above.
Convert each tuple to a row.
Create a DataFrame by applying createDataFrame on RDD with the help of sqlContext.

In [3]:
from pyspark.sql import Row
l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
schemaPeople = sqlContext.createDataFrame(people)

In [5]:
type(schemaPeople)

pyspark.sql.dataframe.DataFrame

In [None]:
#Method 1: 

In [6]:
rdd = sc.parallelize([(1,2,3),(4,5,6),(7,8,9)])

In [7]:
df = rdd.toDF(["a","b","c"])

In [9]:
df.show(4)

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  7|  8|  9|
+---+---+---+



In [10]:
#Method 2

In [11]:
from pyspark.sql import Row

In [12]:
rdd = sc.parallelize([Row(a=1,b=2,c=3),Row(a=4,b=5,c=6),Row(a=7,b=8,c=9)])

In [13]:
df = rdd.toDF()

In [14]:
df.show(3)

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  7|  8|  9|
+---+---+---+



In [15]:
sc.stop()

#https://gokhanatil.com/2018/04/pyspark-examples-1-grouping-data-from-csv-file-using-rdds.html

In [4]:
from pyspark import SparkContext

In [5]:
sc = SparkContext.getOrCreate()

In [6]:
k=sc.textFile("/Users/spillai/Downloads/user.txt" ) \
    .map(lambda x: (x.split('|')[2],1) ) \
    .reduceByKey( lambda x,y:x+y ) \
    .collect()

In [7]:
sc.stop()

In [12]:
k[0][1]

670

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [63]:
#Loading the CSV into a dataframe take the Movielens CSV file 
data = spark.read.format("csv")\
.option("delimiter", ";")\
.option("header", True)\
.load("/Users/spillai/Downloads/movielens.csv")

In [64]:
data.show(3)

+-------+--------------------+--------------------+------+------+----------+
|movieId|               title|              genres|userId|rating| timestamp|
+-------+--------------------+--------------------+------+------+----------+
|      1|    Toy Story (1995)|Adventure|Animati...|     7|   3.0| 851866703|
|      2|      Jumanji (1995)|Adventure|Childre...|    15|   2.0|1134521380|
|      3|Grumpier Old Men ...|      Comedy|Romance|     5|   4.0|1163374957|
+-------+--------------------+--------------------+------+------+----------+
only showing top 3 rows



In [65]:
data.select("title", "rating").show(3)

+--------------------+------+
|               title|rating|
+--------------------+------+
|    Toy Story (1995)|   3.0|
|      Jumanji (1995)|   2.0|
|Grumpier Old Men ...|   4.0|
+--------------------+------+
only showing top 3 rows



In [66]:
data.filter(data['rating'] < 3.0).show(3)

+-------+--------------------+--------------------+------+------+----------+
|movieId|               title|              genres|userId|rating| timestamp|
+-------+--------------------+--------------------+------+------+----------+
|      2|      Jumanji (1995)|Adventure|Childre...|    15|   2.0|1134521380|
|     11|American Presiden...|Comedy|Drama|Romance|    15|   2.5|1093028381|
|     14|        Nixon (1995)|               Drama|    15|   2.5|1166586286|
+-------+--------------------+--------------------+------+------+----------+
only showing top 3 rows



In [67]:
#Group by Operation on the dataframe
data.groupBy(data['rating']).count().orderBy('rating').show(2)

+------+-----+
|rating|count|
+------+-----+
|   0.5|  227|
|   1.0|  574|
+------+-----+
only showing top 2 rows



In [68]:
#Using SQL to Query the dataframe. we need to register a DataFrame as a temp view
data.createOrReplaceTempView("movielens")

In [69]:
spark.sql("select * from movielens where rating < 3").show(3)

+-------+--------------------+--------------------+------+------+----------+
|movieId|               title|              genres|userId|rating| timestamp|
+-------+--------------------+--------------------+------+------+----------+
|      2|      Jumanji (1995)|Adventure|Childre...|    15|   2.0|1134521380|
|     11|American Presiden...|Comedy|Drama|Romance|    15|   2.5|1093028381|
|     14|        Nixon (1995)|               Drama|    15|   2.5|1166586286|
+-------+--------------------+--------------------+------+------+----------+
only showing top 3 rows



Now, let’s create the DataFrame from RDD. We’ll use the same dataset, but this time will load it as a text file (also without a header). We want to keep only three columns for simplicity. So, load data into RDD, split by semicolon and select first three entries for each row

In [3]:
#Creating a SparkContext First 
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [4]:
rdd = sc.textFile("/Users/spillai/Downloads/movielens.txt")\
.map(lambda line: line.split(";"))\
.map(lambda splits: (int(splits[0]), splits[1], splits[2]))

In [5]:
#Then, take a look at the contents of rdd
for elem in rdd.take(5):
   print(elem)

(1, 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy')
(2, 'Jumanji (1995)', 'Adventure|Children|Fantasy')
(3, 'Grumpier Old Men (1995)', 'Comedy|Romance')
(4, 'Waiting to Exhale (1995)', 'Comedy|Drama|Romance')
(5, 'Father of the Bride Part II (1995)', 'Comedy')


At this moment, we import dependencies and create fields with specific types for the schema and as well as a schema itself

In [6]:
from pyspark.sql.types import *
id_field = StructField("id", IntegerType(), True)
title_field = StructField("title", StringType(), True)
genres_field = StructField("genres", StringType(), True)
schema = StructType([id_field, title_field, genres_field])

In [7]:
movielens = spark.createDataFrame(rdd, schema)
movielens.show(3)

+---+--------------------+--------------------+
| id|               title|              genres|
+---+--------------------+--------------------+
|  1|    Toy Story (1995)|Adventure|Animati...|
|  2|      Jumanji (1995)|Adventure|Childre...|
|  3|Grumpier Old Men ...|      Comedy|Romance|
+---+--------------------+--------------------+
only showing top 3 rows



Loading another movie Lens Dataset 

In [13]:
rdd2 = sc.textFile("/Users/spillai/Downloads/sample_movielens_ratings.txt")\
.map(lambda line: line.split("::"))\
.map(lambda splits: (int(splits[0]), int(splits[1]), float(splits[2]),int(splits[3])))

In [14]:
#Then, take a look at the contents of rdd
for elem in rdd2.take(5):
   print(elem)

(0, 2, 3.0, 1424380312)
(0, 3, 1.0, 1424380312)
(0, 5, 2.0, 1424380312)
(0, 9, 4.0, 1424380312)
(0, 11, 1.0, 1424380312)


In [18]:
from pyspark.sql.types import *
userId_field = StructField("userId", IntegerType(), True)
movieId_field = StructField("movieId", IntegerType(), True)
rating_field = StructField("rating", FloatType(), True)
timestamp_field=StructField("timestamp", LongType(), True)
schema = StructType([userId_field, movieId_field,rating_field,timestamp_field])

In [19]:
movielens2 = spark.createDataFrame(rdd2, schema)

In [20]:
movielens2.show(2)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     0|      2|   3.0|1424380312|
|     0|      3|   1.0|1424380312|
+------+-------+------+----------+
only showing top 2 rows

