Here is an example on how to use PySpark to do a few simple data manipulations and create a csv as output. This notebook gives the same output as the Pandas and PETL examples.

I used the pyspark-notebook docker image provided by Jupyter to run this notebook:https://hub.docker.com/r/jupyter/pyspark-notebook. 

You can run the docker environment as follows:
docker run -v \<local path to this notebook\>:/home/jovyan/work -p 8888:8888 jupyter/pyspark-notebook
    
Author: PCA

In [1]:
# Import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import upper, udf
from pyspark.sql.types import StringType

In [2]:
# Build the SparkSession
spark = SparkSession.builder \
   .master("local") \
   .appName("Join Example") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
sc = spark.sparkContext

In [3]:
df_scores = spark.read.options(header='True', inferSchema='True').csv("inputfiles/players.csv")
df_scores.printSchema()
df_scores.show()

root
 |-- id: integer (nullable = true)
 |-- game_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- amount: integer (nullable = true)
 |-- loggedin: string (nullable = true)

+---+-------+---------+-----+------+----------+
| id|game_id|     name|score|amount|  loggedin|
+---+-------+---------+-----+------+----------+
|  1|     10|Mega Claw|   10|    34|20-01-1980|
|  2|     10|   Pewter|   24|   223|23-02-1985|
|  3|     10|Sir Tiger|  500|  5632|11-05-1990|
|  4|     11|Sir Tiger|   50|    10|01-01-1901|
+---+-------+---------+-----+------+----------+



In [4]:
df_games = spark.read.options(header='True', inferSchema='True').csv("inputfiles/games.csv")
df_games.printSchema()
df_games.show()

root
 |-- game_id: integer (nullable = true)
 |-- game_name: string (nullable = true)
 |-- game_type: string (nullable = true)

+-------+----------+---------+
|game_id| game_name|game_type|
+-------+----------+---------+
|     10|Mouse Hunt|        R|
|     11|Bird Catch|        A|
|     12|Fast Racer|        S|
+-------+----------+---------+



In [5]:
df_join = df_scores.join(df_games, df_scores.game_id == df_games.game_id, "inner")
df_join.show()

+---+-------+---------+-----+------+----------+-------+----------+---------+
| id|game_id|     name|score|amount|  loggedin|game_id| game_name|game_type|
+---+-------+---------+-----+------+----------+-------+----------+---------+
|  1|     10|Mega Claw|   10|    34|20-01-1980|     10|Mouse Hunt|        R|
|  2|     10|   Pewter|   24|   223|23-02-1985|     10|Mouse Hunt|        R|
|  3|     10|Sir Tiger|  500|  5632|11-05-1990|     10|Mouse Hunt|        R|
|  4|     11|Sir Tiger|   50|    10|01-01-1901|     11|Bird Catch|        A|
+---+-------+---------+-----+------+----------+-------+----------+---------+



In [6]:
def conv_game_type(game_type):
    mapping = {"A": "Action", "R": "RPG", "S": "Simulation"}

    return mapping[game_type]

udf_conv_game_type = udf(conv_game_type, StringType())

In [7]:
def convert_date(dt):
    dt_parts = dt.split('-')
    return f"{dt_parts[2]}{dt_parts[1]}{dt_parts[0]}"

udf_convert_date = udf(convert_date, StringType())

In [8]:
df_final = df_join \
    .withColumn("totalscore", df_join.score * df_join.amount) \
    .withColumn("name", upper("name")) \
    .withColumn("game_type", udf_conv_game_type("game_type")) \
    .withColumn("loggedin", udf_convert_date("loggedin")) \
    .select('name', 'game_name', 'totalscore', 'loggedin', 'game_type') \
    .withColumnRenamed('loggedin', 'date_logged_in') \
    .withColumnRenamed('totalscore', 'total_score')


df_final.show()


+---------+----------+-----------+--------------+---------+
|     name| game_name|total_score|date_logged_in|game_type|
+---------+----------+-----------+--------------+---------+
|MEGA CLAW|Mouse Hunt|        340|      19800120|      RPG|
|   PEWTER|Mouse Hunt|       5352|      19850223|      RPG|
|SIR TIGER|Mouse Hunt|    2816000|      19900511|      RPG|
|SIR TIGER|Bird Catch|        500|      19010101|   Action|
+---------+----------+-----------+--------------+---------+



Save as a csv in folder game_score. By using repartition(1) there is only 1 csv file. This will move all the data to a single machine first so is not suitable for very large datasets.

The name of the file is chosen by PySpark (for example: part-00000-cf2aab4b-3b79-4342-849a-bbdf7aff4288-c000.csv).

In [9]:
df_final.repartition(1).write.options(header='True', delimiter='|').format('csv').mode('overwrite').save("game_scores")

Alternative: Save as a single CSV by using Pandas (this will move all data to a single machine as well)

In [10]:
df_final.toPandas().to_csv("game_scores.csv", index=False, header=True, sep='|')