In [3]:
from pyspark import SparkContext, SparkConf



In [4]:
sc = SparkContext()

In [5]:
#Produce RDD with List of first 15 natural numbers
rdd = sc.parallelize(list(range(1,15)))
#show the elements in RDD
rdd.collect()


[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

In [6]:
type(rdd)

In [7]:
#show the elements and number of partitions in RDD
print(rdd.collect())
rdd.getNumPartitions()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]


2

In [8]:
#returns the first element in the RDD.

rdd.first()

1

In [9]:
#Use filter transformation to create a new RDD by selecting elements that are even.

rdd.filter(lambda x: x%2==0).collect()

[2, 4, 6, 8, 10, 12, 14]

In [10]:
#Apply map transformation to each element in the RDD and returns a new RDD with square of each element as an output.

rdd.map(lambda x: x**2).collect()


[1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196]

In [11]:
#aggregates all elements in the RDD using reduce action

sum = rdd.reduce(lambda x,y: x+y)
print(sum)


105


In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [12]:
#saves the RDD data as a text file

rdd.saveAsTextFile('/content/BDA/rdd_data.txt')

In [13]:
#take two new list RDDs and Combine them with union transformation

rdd1 = sc.parallelize([1,2,3,4,5])
rdd2 = sc.parallelize([6,7,8,9,10])
rdd3 = rdd1.union(rdd2)
rdd3.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [14]:
#Use cartesian transformation on defined list RDDs that returns a new list of ordered pairs.

rdd4 = rdd1.cartesian(rdd2)
rdd4.collect()

[(1, 6),
 (1, 7),
 (2, 6),
 (2, 7),
 (1, 8),
 (1, 9),
 (2, 8),
 (2, 9),
 (1, 10),
 (2, 10),
 (3, 6),
 (3, 7),
 (4, 6),
 (4, 7),
 (5, 6),
 (5, 7),
 (3, 8),
 (3, 9),
 (4, 8),
 (4, 9),
 (3, 10),
 (4, 10),
 (5, 8),
 (5, 9),
 (5, 10)]

In [15]:
#Create an RDD with Dictionary

rdd5 = sc.parallelize([('a',1),('b',2),('c',3),('a',1)])
rdd5.collect()

[('a', 1), ('b', 2), ('c', 3), ('a', 1)]

In [16]:
#Get unique value in the RDD as the key and its count as the value.

rdd5.countByKey()

defaultdict(int, {'a': 2, 'b': 1, 'c': 1})

In [20]:
#Create RDD by combining multiple .text files
rdd4.saveAsTextFile('/content/BDA/file1.txt')
rdd5.saveAsTextFile('/content/BDA/file2.txt')
rdd6 = sc.textFile('/content/BDA/file1.txt').union(sc.textFile('/content/BDA/file2.txt'))
rdd6.collect()



['(1, 8)',
 '(1, 9)',
 '(2, 8)',
 '(2, 9)',
 '(1, 10)',
 '(2, 10)',
 '(3, 6)',
 '(3, 7)',
 '(4, 6)',
 '(4, 7)',
 '(5, 6)',
 '(5, 7)',
 '(1, 6)',
 '(1, 7)',
 '(2, 6)',
 '(2, 7)',
 '(3, 8)',
 '(3, 9)',
 '(4, 8)',
 '(4, 9)',
 '(3, 10)',
 '(4, 10)',
 '(5, 8)',
 '(5, 9)',
 '(5, 10)',
 "('c', 3)",
 "('a', 1)",
 "('a', 1)",
 "('b', 2)"]

In [21]:
#Inspect the First 5 Lines of an RDD

rdd6.take(5)

['(1, 8)', '(1, 9)', '(2, 8)', '(2, 9)', '(1, 10)']

In [27]:
#Create Dataframe and Dataset

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()


columns = ["col1","col2"]
data = [("Java",2000),("Python",10000),("Scala",3000)]

rdd = spark.sparkContext.parallelize(data)

dfFromRDD = rdd.toDF(columns)
dfFromRDD.printSchema()
rdd.collect()

root
 |-- col1: string (nullable = true)
 |-- col2: long (nullable = true)



[('Java', 2000), ('Python', 10000), ('Scala', 3000)]

In [39]:
sc.stop()

In [40]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

sc = SparkContext()
spark = SparkSession(sc)

data = [("Java",2000),("Python",10000),("Scala",3000)]
rdd = sc.parallelize(data)

schema = StructType([
    StructField("Col1", StringType(), True),
    StructField("Col2", IntegerType(), True)
])

df = spark.createDataFrame(rdd, schema)
df.printSchema()
df.show()
sc.stop()

root
 |-- Col1: string (nullable = true)
 |-- Col2: integer (nullable = true)

+------+-----+
|  Col1| Col2|
+------+-----+
|  Java| 2000|
|Python|10000|
| Scala| 3000|
+------+-----+



In [46]:
#Show difference between RDD, Dataframe and Dataset using example

from pyspark import SparkContext

sc = SparkContext()

data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

# Perform a transformation (map) and an action (collect)
squared_rdd = rdd.map(lambda x: x ** 2)
result = squared_rdd.collect()
print("RDD example:",result)

sc.stop()

RDD example: [1, 4, 9, 16, 25]


In [48]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()

data = [("Alice", 1), ("Bob", 2), ("Cathy", 3)]
df = spark.createDataFrame(data, ["Name", "Id"])
print("DataFrame example:")
df.show()

spark.stop()

DataFrame example:
+-----+---+
| Name| Id|
+-----+---+
|Alice|  1|
|  Bob|  2|
|Cathy|  3|
+-----+---+



In [50]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
spark = SparkSession.builder.appName("Dataset Example").getOrCreate()
class Person:
    def __init__(self, name, id):
        self.name = name
        self.id = id

person_rdd = spark.sparkContext.parallelize([Person("Alice", 1), Person("Bob", 2), Person("Cathy", 3)])
person_df = spark.createDataFrame(person_rdd)
print("DataFrame created from RDD of Person objects:")
person_df.show()
spark.stop()


DataFrame created from RDD of Person objects:
+---+-----+
| id| name|
+---+-----+
|  1|Alice|
|  2|  Bob|
|  3|Cathy|
+---+-----+

