In [3]:
# !pip install pyspark

In [4]:
# SparkContext - Entry point to PySpark Functionality
# SparkSession - Entry point to work RDD
    # it was introduced in version 2.0
    # replaced SQLContext, HiveContext

from pyspark.sql import SparkSession

In [5]:
import os, sys

os.environ["PYSPARK_PYTHON"] =sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [6]:
# master() - if you are running it on clusted you need to use master name
# ideally it would be either YARN or Mesos

# local[x] - when running in standalone mode
# x - how many partitions it should create with rdd

spark = SparkSession.builder.master("local[1]").appName("testApp").getOrCreate()

25/04/08 20:02:27 WARN Utils: Your hostname, Ravikants-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.10 instead (on interface en0)
25/04/08 20:02:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/08 20:02:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x17d12bd70>


In [8]:
spark

In [9]:
dataset = [
    (101, "John", 56000, "Male", 2),
    (102, "Sam", 65000, "Male", 1),
    (103, "Max", 50000, "Male", 3),
    (104, "Nick", 80000, "Male", 0),
    (105, "Sneha", 25000, "Female", 5),
    (106, "Neha", 50000, "Female", 1),
    (107, "Monica", 30000, "Female", 2),
    (108, "Adam", 150000, "Male", 3),
    (109, "Alex", 100000, "Male", 3),
    (110, "Smith", 90000, "Male", 1),
]

In [10]:
# RDD - Resilient Distributed Dataset
# - fundamental data structure of PySpark that in fault-tolerant
# - distributed collections of objects
# - immutable - once created cannot be changed

rdd = spark.sparkContext.parallelize(dataset)

In [11]:
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289

In [12]:
rdd.getNumPartitions()

1

In [13]:
rdd.collect()

[(101, 'John', 56000, 'Male', 2),
 (102, 'Sam', 65000, 'Male', 1),
 (103, 'Max', 50000, 'Male', 3),
 (104, 'Nick', 80000, 'Male', 0),
 (105, 'Sneha', 25000, 'Female', 5),
 (106, 'Neha', 50000, 'Female', 1),
 (107, 'Monica', 30000, 'Female', 2),
 (108, 'Adam', 150000, 'Male', 3),
 (109, 'Alex', 100000, 'Male', 3),
 (110, 'Smith', 90000, 'Male', 1)]

In [15]:
rdd.toDF().show()

+---+------+------+------+---+
| _1|    _2|    _3|    _4| _5|
+---+------+------+------+---+
|101|  John| 56000|  Male|  2|
|102|   Sam| 65000|  Male|  1|
|103|   Max| 50000|  Male|  3|
|104|  Nick| 80000|  Male|  0|
|105| Sneha| 25000|Female|  5|
|106|  Neha| 50000|Female|  1|
|107|Monica| 30000|Female|  2|
|108|  Adam|150000|  Male|  3|
|109|  Alex|100000|  Male|  3|
|110| Smith| 90000|  Male|  1|
+---+------+------+------+---+



In [16]:
columns = ["ID","Name","Salary","Gender","Leaves"]
df = rdd.toDF(columns)

In [17]:
df.show()

+---+------+------+------+------+
| ID|  Name|Salary|Gender|Leaves|
+---+------+------+------+------+
|101|  John| 56000|  Male|     2|
|102|   Sam| 65000|  Male|     1|
|103|   Max| 50000|  Male|     3|
|104|  Nick| 80000|  Male|     0|
|105| Sneha| 25000|Female|     5|
|106|  Neha| 50000|Female|     1|
|107|Monica| 30000|Female|     2|
|108|  Adam|150000|  Male|     3|
|109|  Alex|100000|  Male|     3|
|110| Smith| 90000|  Male|     1|
+---+------+------+------+------+



In [18]:
df.printSchema()

root
 |-- ID: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Salary: long (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Leaves: long (nullable = true)



In [19]:
from pyspark.sql.types import StringType, IntegerType, StructField, StructType

In [20]:
schema = StructType([
    StructField("EmpId", IntegerType()),
    StructField("EmpName",StringType()),
    StructField("EmpSalary", IntegerType()),
    StructField("Gender", StringType()),
    StructField("EmpLeaves", IntegerType())
])

In [21]:
new_df = spark.createDataFrame(data=dataset, schema=schema)

In [22]:
new_df.show()

+-----+-------+---------+------+---------+
|EmpId|EmpName|EmpSalary|Gender|EmpLeaves|
+-----+-------+---------+------+---------+
|  101|   John|    56000|  Male|        2|
|  102|    Sam|    65000|  Male|        1|
|  103|    Max|    50000|  Male|        3|
|  104|   Nick|    80000|  Male|        0|
|  105|  Sneha|    25000|Female|        5|
|  106|   Neha|    50000|Female|        1|
|  107| Monica|    30000|Female|        2|
|  108|   Adam|   150000|  Male|        3|
|  109|   Alex|   100000|  Male|        3|
|  110|  Smith|    90000|  Male|        1|
+-----+-------+---------+------+---------+



In [23]:
new_df.printSchema()

root
 |-- EmpId: integer (nullable = true)
 |-- EmpName: string (nullable = true)
 |-- EmpSalary: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- EmpLeaves: integer (nullable = true)



In [24]:
new_df.count()

10

In [25]:
new_df.limit(3).show()

+-----+-------+---------+------+---------+
|EmpId|EmpName|EmpSalary|Gender|EmpLeaves|
+-----+-------+---------+------+---------+
|  101|   John|    56000|  Male|        2|
|  102|    Sam|    65000|  Male|        1|
|  103|    Max|    50000|  Male|        3|
+-----+-------+---------+------+---------+



In [26]:
new_df.limit(3).toPandas()

Unnamed: 0,EmpId,EmpName,EmpSalary,Gender,EmpLeaves
0,101,John,56000,Male,2
1,102,Sam,65000,Male,1
2,103,Max,50000,Male,3


In [27]:
new_df.select("EmpName", "EmpSalary").show(3)

+-------+---------+
|EmpName|EmpSalary|
+-------+---------+
|   John|    56000|
|    Sam|    65000|
|    Max|    50000|
+-------+---------+
only showing top 3 rows

