# Apache Arrow with Pandas and Apache Spark

In [1]:
import time

import pandas as pd
import pyarrow.csv as pc
import pyarrow.parquet as pq
from pyspark.sql import SparkSession

In [2]:
csv_file_name = 'Building_Permits.csv'

## Pandas

In [3]:
tic = time.perf_counter()
pandas_df = pd.read_csv(csv_file_name)
toc = time.perf_counter()

  has_raised = await self.run_ast_nodes(code_ast.body, cell_name,


In [4]:
print(type(pandas_df))
print(f'Pandas read (CSV) in {toc - tic:0.4f} seconds')

<class 'pandas.core.frame.DataFrame'>
Pandas read (CSV) in 1.4607 seconds


## PySpark

In [5]:
spark = SparkSession.builder.master('local[1]').appName('MyApp').getOrCreate()

In [6]:
tic = time.perf_counter()
spark_df = spark.read.csv(csv_file_name)
toc = time.perf_counter()

In [7]:
print(type(spark_df))
print(f'PySpark read (CSV) in {toc - tic:0.4f} seconds')

<class 'pyspark.sql.dataframe.DataFrame'>
PySpark read (CSV) in 3.7077 seconds


## Arrow

In [8]:
tic = time.perf_counter()
table = pc.read_csv(csv_file_name)
df = table.to_pandas()
toc = time.perf_counter()

In [9]:
print(type(df))
print(f'Arrow read (CSV to Pandas) in {toc - tic:0.4f} seconds')

<class 'pandas.core.frame.DataFrame'>
Arrow read (CSV to Pandas) in 0.5448 seconds


In [10]:
tic = time.perf_counter()
spark_df.toPandas()
toc = time.perf_counter()

In [12]:
print(type(spark_df))
print(f'Spark DF to Pandas without Arrow in {toc - tic:0.4f} seconds')

<class 'pyspark.sql.dataframe.DataFrame'>
Spark DF to Pandas without Arrow in 8.2552 seconds


Got memory leak issue. Set OS environment variable `export ARROW_PRE_0_15_IPC_FORMAT=1`. (Credit: https://george-jen.gitbook.io/data-science-and-apache-spark/enabling-for-conversion-to-from-pandas)

In [13]:
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')

In [14]:
tic = time.perf_counter()
spark_df.toPandas()
toc = time.perf_counter()

  Arrow legacy IPC format is not supported in PySpark, please unset ARROW_PRE_0_15_IPC_FORMAT
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


In [15]:
print(type(spark_df))
print(f'Spark DF to Pandas using Arrow in {toc - tic:0.4f} seconds')

<class 'pyspark.sql.dataframe.DataFrame'>
Spark DF to Pandas using Arrow in 6.4608 seconds


### With Parquet File

In [19]:
parquert_file_name = 'building_permits.parquet'
pq.write_table(table, parquert_file_name)

In [20]:
tic = time.perf_counter()
pdf = pd.read_parquet(parquert_file_name)
toc = time.perf_counter()

In [21]:
print(f'Pandas read (Parquet) in {toc - tic:0.4f} seconds')

Pandas read (Parquet) in 0.5176 seconds


In [22]:
tic = time.perf_counter()
table = pq.read_table(parquert_file_name)
toc = time.perf_counter()

In [23]:
print(f'Arrow read (Parquet) in {toc - tic:0.4f} seconds')

Arrow read (Parquet) in 0.1697 seconds
