<a href="https://colab.research.google.com/github/jimtoh/sctp_ntu/blob/main/5m_data_2_9_Hands_on_with_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Hands-on with Spark

![spark](https://cdn-images-1.medium.com/max/300/1*c8CtvqKJDVUnMoPGujF5fA.png)

PySpark is the Python API of Spark; which means it can do almost all the things python can- Machine learning (ML), exploratory data analysis (EDA), ETLs for data platform. And all of them in a distributed manner.

![pyspark](https://editor.analyticsvidhya.com/uploads/20981sp3.JPG)

In simple terms, each time you submit a PySpark job, the code gets internally converted into a MapReduce program and gets executed in the Java Virtual Machine. Spark also uses Lazy Evaluation, it delays its evaluation as much as it can. Each time you submit a job, spark creates an action plan for how to execute the code, and then does nothing. Finally, when you ask for the result (i.e, calls an action), it executes the plan, which is basically all the transofrmations you have mentioned in your code.


For our lesson, we will be running Spark on **local** mode.

In this mode, Spark runs on a single machine, utilizing available cores (similar to Polars).
It's useful for development, learning, testing, and debugging since everything runs on the local machine without needing a cluster.

In actual production environments, Spark is usually run on a cluster of machines (YARN, Mesos or Kubernetes mode).

## Installing and Initializing Spark

First, we'll need to install Spark and its dependencies:

1.   Java 8
2.   Apache Spark with Hadoop
3.   Findspark (used to locate the Spark in the system)


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# This link is down, need to use another mirror site
# !wget -q https://dlcdn.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
# Either retrieve from archive https://archive.apache.org/dist/spark/spark-3.5.5/ or from latest https://spark.apache.org/downloads.html
!wget -q https://archive.apache.org/dist/spark/spark-3.5.6/spark-3.5.6-bin-hadoop3.tgz
!tar xf spark-3.5.6-bin-hadoop3.tgz
!pip install -q findspark

List the files in the current directory:

In [None]:
!ls

sample_data		 spark-3.5.6-bin-hadoop3.tgz
spark-3.5.6-bin-hadoop3  spark-3.5.6-bin-hadoop3.tgz.1


*Print* the current directory:

In [None]:
!pwd

/content


Set the OS Environment Variables so that Findspark can locate Spark in the system:

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.5.5-bin-hadoop3"
os.environ["SPARK_HOME"] = "/content/spark-3.5.6-bin-hadoop3"

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Invoking `spark` will print the SparkContext

In [None]:
spark

## Spark SQL and DataFrames

Spark SQL and DataFrames are higher-level modules of Apache Spark, they work together to make data processing with Spark more intuitive and optimized, especially for those who come from an SQL or data analytics background.

- **SQL Interface to Spark**: Spark SQL lets users execute SQL queries alongside Spark programs.

- **DataFrames are abstraction over RDDs**: A DataFrame is a distributed collection of data organized into named columns. Conceptually, it's equivalent to a table in a relational database or a data frame in R or Python, but with optimizations for distributed processing and scalability. While RDD (Resilient Distributed Dataset) is a fundamental data structure in Spark, DataFrames provide a higher-level abstraction that is often easier to use and more optimized for many tasks.

### DataFrame Creation

In [None]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

In [None]:
df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

PySpark infers the DataFrame schema (dtypes) by taking a sample from the data (just like Pandas)

In [None]:
df.show(5)

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+




You can also pass an explicit schema:

In [None]:
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

PySpark DataFrame is lazily evaluated and invoking `df` does not trigger the computation and show anything. You need to explicitly call the `show` method:

In [None]:
df.show()
df.printSchema()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



### Getting Data In/Out

There are many data sources available in PySpark such as CSV, Parquet, ORC, JDBC, text, Avro, etc.

First, let's download the files in various format from GCS, data which we explored in Unit 2.3 and 2.8.

In [None]:
!mkdir -p data

In [None]:
!wget -P data https://storage.googleapis.com/su-artifacts/movies_metadata.csv
!wget -P data https://storage.googleapis.com/su-artifacts/ratings.csv
!wget -P data https://storage.googleapis.com/su-artifacts/taxi_trip_data.csv
!wget -P data https://storage.googleapis.com/su-artifacts/userdata1.orc
!wget -P data https://storage.googleapis.com/su-artifacts/userdata1.parquet

--2025-06-10 01:06:11--  https://storage.googleapis.com/su-artifacts/movies_metadata.csv
Resolving storage.googleapis.com (storage.googleapis.com)... 172.217.0.91, 172.217.12.27, 142.250.65.123, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|172.217.0.91|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 34445126 (33M) [text/csv]
Saving to: ‘data/movies_metadata.csv’


2025-06-10 01:06:16 (11.5 MB/s) - ‘data/movies_metadata.csv’ saved [34445126/34445126]

--2025-06-10 01:06:16--  https://storage.googleapis.com/su-artifacts/ratings.csv
Resolving storage.googleapis.com (storage.googleapis.com)... 172.217.0.91, 172.217.12.27, 142.250.65.123, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|172.217.0.91|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 709550327 (677M) [text/csv]
Saving to: ‘data/ratings.csv’


2025-06-10 01:06:53 (19.1 MB/s) - ‘data/ratings.csv’ saved [709550327/709550327]

--2025-06-10 01:

In [None]:
!ls data

movies_metadata.csv  taxi_trip_data.csv  userdata1.parquet
ratings.csv	     userdata1.orc


In [None]:
df = spark.read.parquet('data/userdata1.parquet')
df.show()

+-------------------+---+----------+---------+--------------------+------+---------------+-------------------+--------------------+----------+---------+--------------------+--------------------+
|  registration_dttm| id|first_name|last_name|               email|gender|     ip_address|                 cc|             country| birthdate|   salary|               title|            comments|
+-------------------+---+----------+---------+--------------------+------+---------------+-------------------+--------------------+----------+---------+--------------------+--------------------+
|2016-02-03 07:55:29|  1|    Amanda|   Jordan|    ajordan0@com.com|Female|    1.197.201.2|   6759521864920116|           Indonesia|  3/8/1971| 49756.53|    Internal Auditor|               1E+02|
|2016-02-03 17:04:03|  2|    Albert|  Freeman|     afreeman1@is.gd|  Male| 218.111.175.34|                   |              Canada| 1/16/1968|150280.17|       Accountant IV|                    |
|2016-02-03 01:09:31|  3|

In [None]:
df = spark.read.orc('data/userdata1.orc')
df.show()

+-------------------+-----+--------+---------+--------------------+------+---------------+----------------+--------------------+----------+---------+--------------------+------+
|              _col0|_col1|   _col2|    _col3|               _col4| _col5|          _col6|           _col7|               _col8|     _col9|   _col10|              _col11|_col12|
+-------------------+-----+--------+---------+--------------------+------+---------------+----------------+--------------------+----------+---------+--------------------+------+
|2016-02-03 13:36:39|    1|  Donald|    Lewis|dlewis0@clickbank...|  Male|  102.22.124.20|                |           Indonesia|  7/9/1972|140249.37|Senior Financial ...|      |
|2016-02-03 00:22:28|    2|  Walter|  Collins|wcollins1@bloglov...|  Male|   247.28.26.93|3587726269478025|               China|          |     NULL|                    |      |
|2016-02-03 18:29:04|    3|Michelle|Henderson|mhenderson2@geoci...|Female| 193.68.146.150|                |   

In [None]:
movies = spark.read.csv('data/movies_metadata.csv', header=True)
ratings = spark.read.csv('data/ratings.csv', header=True)
taxi = spark.read.csv('data/taxi_trip_data.csv', header=True)

In [None]:
movies.printSchema()

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nu

In [None]:
ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [None]:
taxi.printSchema()

root
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- imp_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- pickup_location_id: string (nullable = true)
 |-- dropoff_location_id: string (nullable = true)



In [None]:
movies = spark.read.csv('data/movies_metadata.csv', header=True, inferSchema=True, quote="\"", escape="\"")

In [None]:
movies.printSchema()

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nu

In [None]:
ratings = spark.read.csv('data/ratings.csv', header=True, inferSchema=True)

In [None]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



Passing a schema will speed up the read.

In [None]:
dtypes = ['vendor_id integer',
 'pickup_datetime timestamp',
 'dropoff_datetime timestamp',
 'passenger_count integer',
 'trip_distance double',
 'rate_code integer',
 'store_and_fwd_flag string',
 'payment_type integer',
 'fare_amount double',
 'extra double',
 'mta_tax double',
 'tip_amount double',
 'tolls_amount double',
 'imp_surcharge double',
 'total_amount double',
 'pickup_location_id integer',
 'dropoff_location_id integer']

In [None]:
taxi = spark.read.csv('data/taxi_trip_data.csv', header=True,
                      schema=', '.join(dtypes))

In [None]:
taxi.printSchema()

root
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- imp_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)



There are a couple of ways to view your dataframe in PySpark:

1.   `df.take(5)` will return a list of five Row objects.
2.   `df.collect()` will get all of the data from the entire DataFrame. Be really careful when using it, because if you have a large data set, you can easily crash the driver node.
3.   `df.show()` is the most commonly used method to view a dataframe. There are a few parameters we can pass to this method, like the number of rows and truncaiton. For example, `df.show(5, False)` or ` df.show(5, truncate=False)` will show the entire data wihtout any truncation.
4.   `df.limit(5)` will **return a new DataFrame** by taking the first n rows. As spark is distributed in nature, there is no guarantee that `df.limit()` will give you the same results each time.

In [None]:
movies.show(10)

+-----+---------------------+--------+--------------------+--------------------+-----+---------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+---------+-------+--------------------+--------+--------------------+--------------------+-----+------------+----------+
|adult|belongs_to_collection|  budget|              genres|            homepage|   id|  imdb_id|original_language|      original_title|            overview|popularity|         poster_path|production_companies|production_countries|release_date|  revenue|runtime|    spoken_languages|  status|             tagline|               title|video|vote_average|vote_count|
+-----+---------------------+--------+--------------------+--------------------+-----+---------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+---------+-------+-----------

In [None]:
movies.show(5, truncate=False)

+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-------------------------------------------------------------------------------------------------+------------------------------------+-----+---------+-----------------+---------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------

In [None]:
# can also be shown vertically
movies.show(1, vertical=True, truncate=False)

In [None]:
ratings.show(10)

In [None]:
taxi.show(10)

In [None]:
taxi.limit(5).show()

In [None]:
movies.describe().show()

In [None]:
ratings.describe().show()

In [None]:
taxi.describe().show()

### DataFrame Operations on Columns

We will go over the following in this section:

1.   Selecting a Column
2.   Selecting Multiple Columns
3.   Adding New Columns
4.   Renaming Columns
5.   Removing Columns


In [None]:
movies.id

Just like DataFrame, selecting a column is lazily evaluated and does not trigger the computation but returns a `Column` instance.

It can be used to select columns and returns another DataFrame:

In [None]:
movies.select(movies.id).show()

In [None]:
movies.select(movies.title, movies.overview).show(truncate=False)

Creating a new column:

In [None]:
from pyspark.sql.functions import lit

In [None]:
# lit means literal. It populates the row with the literal value given
ratings.withColumn('review', lit('Great movie!')).show(10)

In [None]:
ratings.withColumn('review', lit('Great movie!')) \
       .withColumn('mood', lit(5)) \
       .show(10)

In [None]:
movies.withColumnRenamed('overview', 'summary').show(5)

In [None]:
ratings.drop('timestamp').show()

### Common Transformation Functions

In [None]:
# Functions available in PySpark
from pyspark.sql import functions
# We can use the dir function to view the available functions
print(dir(functions))

String functions:

In [None]:
from pyspark.sql.functions import concat, lower, upper, substring

In [None]:
movies.show(5)

In [None]:
movies.select(movies.title, movies.tagline, upper(movies.title), lower(movies.tagline),
              substring(movies.overview, 1, 10),
              concat(movies.title, lit(' Part 1')).alias('new_title')
              ).show()

Numeric functions:

In [None]:
from pyspark.sql.functions import mean, max, round

In [None]:
ratings.select(mean(ratings.rating), max(ratings.rating)).show()

In [None]:
ratings.withColumn('new_rating', round(ratings.rating)).show()

Date/time functions:

In [None]:
from pyspark.sql.functions import to_date, to_timestamp, date_diff

In [None]:
ratings.withColumn('new_timestamp', to_timestamp(ratings.timestamp)).show()

Replace the unix timestamp with new timestamp:

In [None]:
ratings = ratings.withColumn('timestamp', to_timestamp(ratings.timestamp))

In [None]:
ratings.show()

Computing the difference in minutes is more tedious, we'll need to convert the timestamps to unix timestamps (seconds since epoch), compute the difference, and divide by 60.

In [None]:
taxi = taxi.withColumn('trip_duration', (taxi.dropoff_datetime.cast("long") - taxi.pickup_datetime.cast("long"))/60)

In [None]:
taxi.show(10)

### User-Defined Functions (UDF)

PySpark User-Defined Functions (UDFs) help you convert your Python code into a scalable version of itself. It is handy, but beware, as the performance is slower compared to PySpark functions.

In [None]:
import pandas as pd
from pyspark.sql.functions import pandas_udf

In [None]:
@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return (series + 1.0).astype(float)

In [None]:
taxi.select(taxi.passenger_count, pandas_plus_one(taxi.passenger_count)).show()

### DataFrame Operations on Rows

We will show the follwoing in this section:

1.   Filtering Rows
2. 	 Get Distinct Rows
3.   Sorting Rows

In [None]:
movies.filter(movies.status == 'In Production').show()

In [None]:
ratings.filter(ratings.rating > 4.5).show()

Use `&` and `|` for combining conditions as `and` and `or`:

In [None]:
ratings.filter((ratings.rating < 1.5) | (ratings.rating > 4.5)).show()

> 1. Filter `movies` for `status` to be `In Production` and `vote_average` greater than 6.
> 2. Filter `taxi` for `trip_duration` greater than 1 hour, `trip_distance` greater than 10 miles and `passenger_count` equal to and less than 2.

In [None]:
# 1. Filter movies for status 'In Production' and vote_average > 6
# This will show movies that are still in production and have a vote_average greater than 6.
# If this returns no records, it means there are no such movies in your dataset.
movies.filter(
    (movies.status == 'In Production') & (movies.vote_average > 6)
).show()

In [None]:
# 2. Filter taxi for trip_duration > 1 hour, trip_distance > 10 miles, and passenger_count <= 2
# First, create the trip_duration column (in minutes) using pickup and dropoff timestamps.
from pyspark.sql.functions import col

taxi = taxi.withColumn(
    'trip_duration',
    (col('dropoff_datetime').cast('long') - col('pickup_datetime').cast('long')) / 60
)

# Now filter for trips longer than 1 hour, distance > 10 miles, and 2 or fewer passengers.
taxi.filter(
    (col('trip_duration') > 60) &
    (col('trip_distance') > 10) &
    (col('passenger_count') <= 2)
).show()

In [None]:
movies.select('status').distinct().show()

In [None]:
taxi.orderBy('trip_duration').show()

In [None]:
taxi.orderBy('trip_distance', ascending=False).show()

### Grouping Data

In [None]:
movies.groupBy('status').count().show()

In [None]:
ratings.groupBy('movieId').avg('rating').show()

### Joining DataFrames

In [None]:
ratings_avg_by_movie = ratings.groupBy('movieId').avg('rating')

In [None]:
ratings_avg_by_movie.columns

In [None]:
movies.join(ratings_avg_by_movie, movies.id == ratings.movieId, 'inner').select('title', 'vote_average', 'avg(rating)').show()

### Spark SQL

DataFrame and Spark SQL share the same execution engine so they can be interchangeably used seamlessly. For example, you can register the DataFrame as a table and run a SQL easily:

In [None]:
taxi.createOrReplaceTempView("taxi")
spark.sql("SELECT count(*) from taxi").show()

In addition, UDFs can be registered and invoked in SQL out of the box:

In [None]:
@pandas_udf("double")
def div(s1: pd.Series, s2: pd.Series) -> pd.Series:
    return s1 / s2

In [None]:
spark.udf.register("div", div)
spark.sql("SELECT trip_distance, trip_duration, div(trip_distance, trip_duration) AS speed FROM taxi").show()

In [None]:
taxi.selectExpr("div(trip_distance, trip_duration)").show()

## Pandas API on Spark

Pandas API on Spark, previously known as Koalas, is an API that brings the power and flexibility of the pandas API to Apache Spark. It essentially bridges the gap between pandas and Spark by providing a pandas-like API while leveraging Spark's distributed computing capabilities. This makes it easier for pandas users to scale their data science workflows without major changes to their code.

In [None]:
!pip install "numpy<2.0"

In [None]:
!pip show numpy

In [None]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps



Creating a pandas-on-Spark Series and DataFrame:

In [None]:
s = ps.Series([1, 3, 5, np.nan, 6, 8])

In [None]:
s

0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64

In [None]:
psdf = ps.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
    index=[10, 20, 30, 40, 50, 60])

In [None]:
psdf

Unnamed: 0,a,b,c
10,1,100,one
20,2,200,two
30,3,300,three
40,4,400,four
50,5,500,five
60,6,600,six


Creating a pandas DataFrame by passing a numpy array, with a datetime index and labeled columns:

In [None]:
dates = pd.date_range('20130101', periods=6)

In [None]:
dates

DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04',
               '2013-01-05', '2013-01-06'],
              dtype='datetime64[ns]', freq='D')

In [None]:
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))

pdf

Unnamed: 0,A,B,C,D
2013-01-01,-1.448749,-0.560778,-0.504981,0.847135
2013-01-02,0.495992,-1.298031,-1.734889,-0.407519
2013-01-03,-0.616116,-1.52329,-1.334426,-0.555653
2013-01-04,0.159137,-0.596122,-0.210986,-0.411034
2013-01-05,0.92304,-1.154306,0.459133,-1.293728
2013-01-06,0.688872,-0.108203,0.440477,-0.348606


We can now convert this pandas DataFrame to a pandas-on-Spark DataFrame:

In [None]:
psdf = ps.from_pandas(pdf)

In [None]:
type(psdf)

It looks and behaves the same as a pandas DataFrame.

In [None]:
psdf

Creating a Spark DataFrame from pandas DataFrame:

In [None]:
sdf = spark.createDataFrame(pdf)

In [None]:
sdf.show()

Also, it is possible to create a pandas-on-Spark DataFrame from Spark DataFrame easily.

In [None]:
psdf = sdf.pandas_api()

psdf

In [None]:
psdf.plot()

In [None]:
pdf.plot()