In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.master("local[1]") \
    .appName("SparkByExamples.com").getOrCreate()

data = ["Project","Gutenberg’s","Alice’s","Adventures",
"in","Wonderland","Project","Gutenberg’s","Adventures",
"in","Wonderland","Project","Gutenberg’s"]

rdd=spark.sparkContext.parallelize(data)
print(rdd)

ParallelCollectionRDD[2] at readRDDFromFile at PythonRDD.scala:289


In [5]:
rdd2=rdd.map(lambda x: (x,1))
for element in rdd2.collect():
    print(element)

('Project', 1)
('Gutenberg’s', 1)
('Alice’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('Project', 1)
('Gutenberg’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('Project', 1)
('Gutenberg’s', 1)


In [7]:
data = [('James','Smith','M',30),
  ('Anna','Rose','F',41),
  ('Robert','Williams','M',62), 
]
columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()

+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|    30|
|     Anna|    Rose|     F|    41|
|   Robert|Williams|     M|    62|
+---------+--------+------+------+



In [8]:
rdd2=df.rdd.map(lambda x: 
    (x[0]+","+x[1],x[2],x[3]*2)
    )  
df2=rdd2.toDF(["name","gender","new_salary"]   )
df2.show()

+---------------+------+----------+
|           name|gender|new_salary|
+---------------+------+----------+
|    James,Smith|     M|        60|
|      Anna,Rose|     F|        82|
|Robert,Williams|     M|       124|
+---------------+------+----------+



In [9]:
rdd2=df.rdd.map(lambda x: 
    (x[0]+","+x[1],x[2],x[3]*2)
    )  
df2=rdd2.toDF(["name","gender","new_salary"]   )
df2.show()

+---------------+------+----------+
|           name|gender|new_salary|
+---------------+------+----------+
|    James,Smith|     M|        60|
|      Anna,Rose|     F|        82|
|Robert,Williams|     M|       124|
+---------------+------+----------+



In [10]:
for element in rdd2.collect():
    print(element)

('James,Smith', 'M', 60)
('Anna,Rose', 'F', 82)
('Robert,Williams', 'M', 124)


In [11]:
data_employees = [(1, "John", 1), (2, "Emma", 2), (3, "Raj", None), (4, "Nina", 4)]
data_departments = [(1, "HR"), (2, "Tech"), (3, "Marketing"), (None, "Temp")]
columns_employees = ["emp_id", "emp_name", "dept_id"]
columns_departments = ["dept_id", "dept_name"]
df_employees = spark.createDataFrame(data_employees, columns_employees)
df_departments = spark.createDataFrame(data_departments, columns_departments)
df_joined = df_employees.join(df_departments, df_employees.dept_id == df_departments.dept_id)

# Show the result
df_joined.show()

+------+--------+-------+-------+---------+
|emp_id|emp_name|dept_id|dept_id|dept_name|
+------+--------+-------+-------+---------+
|     1|    John|      1|      1|       HR|
|     2|    Emma|      2|      2|     Tech|
+------+--------+-------+-------+---------+



In [12]:
df_cross_joined = df_employees.crossJoin(df_departments)
df_cross_joined.show()
df_leftjoined = df_employees.join(df_departments, df_employees.dept_id == df_departments.dept_id, "left")

# Show the result
df_leftjoined.show()


+------+--------+-------+-------+---------+
|emp_id|emp_name|dept_id|dept_id|dept_name|
+------+--------+-------+-------+---------+
|     1|    John|      1|      1|       HR|
|     1|    John|      1|      2|     Tech|
|     1|    John|      1|      3|Marketing|
|     1|    John|      1|   NULL|     Temp|
|     2|    Emma|      2|      1|       HR|
|     2|    Emma|      2|      2|     Tech|
|     2|    Emma|      2|      3|Marketing|
|     2|    Emma|      2|   NULL|     Temp|
|     3|     Raj|   NULL|      1|       HR|
|     3|     Raj|   NULL|      2|     Tech|
|     3|     Raj|   NULL|      3|Marketing|
|     3|     Raj|   NULL|   NULL|     Temp|
|     4|    Nina|      4|      1|       HR|
|     4|    Nina|      4|      2|     Tech|
|     4|    Nina|      4|      3|Marketing|
|     4|    Nina|      4|   NULL|     Temp|
+------+--------+-------+-------+---------+

+------+--------+-------+-------+---------+
|emp_id|emp_name|dept_id|dept_id|dept_name|
+------+--------+-------+------

In [13]:
df_rightjoined = df_employees.join(df_departments, df_employees.dept_id == df_departments.dept_id, "right")

# Show the result
df_rightjoined.show()


+------+--------+-------+-------+---------+
|emp_id|emp_name|dept_id|dept_id|dept_name|
+------+--------+-------+-------+---------+
|  NULL|    NULL|   NULL|   NULL|     Temp|
|     1|    John|      1|      1|       HR|
|  NULL|    NULL|   NULL|      3|Marketing|
|     2|    Emma|      2|      2|     Tech|
+------+--------+-------+-------+---------+



In [14]:
data_users = [(1, "Alice"), (2, "Bob"), (3, "Charlie"), (4, "David")]
data_purchases = [(1, "Book"), (2, "Pen"), (5, "Notebook")]

columns_users = ["id", "name"]
columns_purchases = ["user_id", "item"]

df_users = spark.createDataFrame(data_users, columns_users)
df_purchases = spark.createDataFrame(data_purchases, columns_purchases)
df_purchasers = df_users.join(df_purchases, df_users.id == df_purchases.user_id, "left_semi")

# Show the result
df_purchasers.show()

+---+-----+
| id| name|
+---+-----+
|  1|Alice|
|  2|  Bob|
+---+-----+



In [15]:
df_purchasers = df_users.join(df_purchases, df_users.id == df_purchases.user_id, "left_anti")

# Show the result
df_purchasers.show()


+---+-------+
| id|   name|
+---+-------+
|  3|Charlie|
|  4|  David|
+---+-------+

