In [1]:
!pip install pyarrow

Defaulting to user installation because normal site-packages is not writeable


## Converting pandas dataframe to apache arrow table

In [2]:
import numpy as np
import pandas as pd
import pyarrow as pa

df = pd.DataFrame({
    'one': [20, np.nan, 2.5], 
    'two': ['january', 'february', 'march'], 
    'three':[True, False, True]}, index=list('abc')) 
table = pa.Table.from_pandas(df)

## Pyarrow Table to pandas data frame

In [3]:
df_new = table.to_pandas()

In [4]:
df_new

Unnamed: 0,one,two,three
a,20.0,january,True
b,,february,False
c,2.5,march,True


## Read csv

In [5]:
from pyarrow import csv
fn = '../../../lap.csv'
table = csv.read_csv(fn)
df = table.to_pandas()

In [6]:
df

Unnamed: 0,packetFormat,gameMajorVersion,gameMinorVersion,packetVersion,packetId,sessionUID,sessionTime,frameIdentifier,playerCarIndex,secondaryPlayerCarIndex,...,safetyCarDelta,carPosition,currentLapNum,pitStatus,sector,currentLapInvalid,penalties,gridPosition,driverStatus,resultStatus
0,2020,1,18,1,2,6777929756610482444,0.019233,0,19,255,...,-0.0,10,1,0,2,0,0,10,4,2
1,2020,1,18,1,2,6777929756610482444,0.040277,1,19,255,...,-0.0,10,1,0,2,0,0,10,4,2
2,2020,1,18,1,2,6777929756610482444,0.072163,2,19,255,...,-0.0,10,1,0,2,0,0,10,4,2
3,2020,1,18,1,2,6777929756610482444,0.086520,3,19,255,...,-0.0,10,1,0,2,0,0,10,4,2
4,2020,1,18,1,2,6777929756610482444,0.116703,4,19,255,...,-0.0,10,1,0,2,0,0,10,4,2
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9691,2020,1,18,1,2,6777929756610482444,237.774536,9804,19,255,...,-0.0,5,3,0,2,0,0,10,4,2
9692,2020,1,18,1,2,6777929756610482444,237.791290,9805,19,255,...,-0.0,5,3,0,2,0,0,10,4,2
9693,2020,1,18,1,2,6777929756610482444,237.824585,9806,19,255,...,-0.0,5,3,0,2,0,0,10,4,2
9694,2020,1,18,1,2,6777929756610482444,237.841248,9807,19,255,...,-0.0,5,4,0,0,0,0,10,4,2


## Writing parquet file

In [7]:
import pyarrow.parquet as pq
pq.write_table(table, 'example.parquet')

## Reading a parquet file


In [8]:
table2 = pq.read_table('example.parquet')
table2

pyarrow.Table
packetFormat: int64
gameMajorVersion: int64
gameMinorVersion: int64
packetVersion: int64
packetId: int64
sessionUID: int64
sessionTime: double
frameIdentifier: int64
playerCarIndex: int64
secondaryPlayerCarIndex: int64
lastLapTime: double
currentLapTime: double
sector1TimeInMS: int64
sector2TimeInMS: int64
bestLapTime: double
bestLapNum: int64
bestLapSector1TimeInMS: int64
bestLapSector2TimeInMS: int64
bestLapSector3TimeInMS: int64
bestOverallSector1TimeInMS: int64
bestOverallSector1LapNum: int64
bestOverallSector2TimeInMS: int64
bestOverallSector2LapNum: int64
bestOverallSector3TimeInMS: int64
bestOverallSector3LapNum: int64
lapDistance: double
totalDistance: double
safetyCarDelta: double
carPosition: int64
currentLapNum: int64
pitStatus: int64
sector: int64
currentLapInvalid: int64
penalties: int64
gridPosition: int64
driverStatus: int64
resultStatus: int64
----
packetFormat: [[2020,2020,2020,2020,2020,...,2020,2020,2020,2020,2020]]
gameMajorVersion: [[1,1,1,1,1,...,1,1

## Reading some columns from a parquet file

In [9]:
table2 = pq.read_table('example.parquet', columns=['sessionTime', 'lapDistance'])
table2

pyarrow.Table
sessionTime: double
lapDistance: double
----
sessionTime: [[0.019233,0.040277,0.072163,0.08652,0.116703,...,237.774536,237.79129,237.824585,237.841248,237.858139]]
lapDistance: [[-46.12793,-46.147949,-46.147949,-46.147949,-46.147949,...,4290.197266,4291.570312,4294.297852,1.257812,2.651367]]

## Reading from file dataset

In [10]:
## Dataset_name is created below weird ass tutorial
dataset = pq.ParquetDataset('example.parquet')
table = dataset.read()
table

pyarrow.Table
packetFormat: int64
gameMajorVersion: int64
gameMinorVersion: int64
packetVersion: int64
packetId: int64
sessionUID: int64
sessionTime: double
frameIdentifier: int64
playerCarIndex: int64
secondaryPlayerCarIndex: int64
lastLapTime: double
currentLapTime: double
sector1TimeInMS: int64
sector2TimeInMS: int64
bestLapTime: double
bestLapNum: int64
bestLapSector1TimeInMS: int64
bestLapSector2TimeInMS: int64
bestLapSector3TimeInMS: int64
bestOverallSector1TimeInMS: int64
bestOverallSector1LapNum: int64
bestOverallSector2TimeInMS: int64
bestOverallSector2LapNum: int64
bestOverallSector3TimeInMS: int64
bestOverallSector3LapNum: int64
lapDistance: double
totalDistance: double
safetyCarDelta: double
carPosition: int64
currentLapNum: int64
pitStatus: int64
sector: int64
currentLapInvalid: int64
penalties: int64
gridPosition: int64
driverStatus: int64
resultStatus: int64
----
packetFormat: [[2020,2020,2020,2020,2020,...,2020,2020,2020,2020,2020]]
gameMajorVersion: [[1,1,1,1,1,...,1,1

## Transforming Parquet file into a pandas dataframe

In [11]:
pdf = pq.read_pandas('example.parquet', columns=['sessionTime']).to_pandas()
pdf

Unnamed: 0,sessionTime
0,0.019233
1,0.040277
2,0.072163
3,0.086520
4,0.116703
...,...
9691,237.774536
9692,237.791290
9693,237.824585
9694,237.841248


## Avoid pandas index

In [12]:
table = pa.Table.from_pandas(df, preserve_index=False)
pq.write_table(table, 'example_no_index.parquet')
t = pq.read_table('example_no_index.parquet')
t.to_pandas()
t

pyarrow.Table
packetFormat: int64
gameMajorVersion: int64
gameMinorVersion: int64
packetVersion: int64
packetId: int64
sessionUID: int64
sessionTime: double
frameIdentifier: int64
playerCarIndex: int64
secondaryPlayerCarIndex: int64
lastLapTime: double
currentLapTime: double
sector1TimeInMS: int64
sector2TimeInMS: int64
bestLapTime: double
bestLapNum: int64
bestLapSector1TimeInMS: int64
bestLapSector2TimeInMS: int64
bestLapSector3TimeInMS: int64
bestOverallSector1TimeInMS: int64
bestOverallSector1LapNum: int64
bestOverallSector2TimeInMS: int64
bestOverallSector2LapNum: int64
bestOverallSector3TimeInMS: int64
bestOverallSector3LapNum: int64
lapDistance: double
totalDistance: double
safetyCarDelta: double
carPosition: int64
currentLapNum: int64
pitStatus: int64
sector: int64
currentLapInvalid: int64
penalties: int64
gridPosition: int64
driverStatus: int64
resultStatus: int64
----
packetFormat: [[2020,2020,2020,2020,2020,...,2020,2020,2020,2020,2020]]
gameMajorVersion: [[1,1,1,1,1,...,1,1

## Check metadata

In [13]:
parquet_file = pq.ParquetFile('example.parquet')
parquet_file.metadata

<pyarrow._parquet.FileMetaData object at 0x7f4755f42de0>
  created_by: parquet-cpp-arrow version 12.0.0
  num_columns: 37
  num_rows: 9696
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 8324

## See data schema

In [14]:
parquet_file.schema

<pyarrow._parquet.ParquetSchema object at 0x7f4755f50e80>
required group field_id=-1 schema {
  optional int64 field_id=-1 packetFormat;
  optional int64 field_id=-1 gameMajorVersion;
  optional int64 field_id=-1 gameMinorVersion;
  optional int64 field_id=-1 packetVersion;
  optional int64 field_id=-1 packetId;
  optional int64 field_id=-1 sessionUID;
  optional double field_id=-1 sessionTime;
  optional int64 field_id=-1 frameIdentifier;
  optional int64 field_id=-1 playerCarIndex;
  optional int64 field_id=-1 secondaryPlayerCarIndex;
  optional double field_id=-1 lastLapTime;
  optional double field_id=-1 currentLapTime;
  optional int64 field_id=-1 sector1TimeInMS;
  optional int64 field_id=-1 sector2TimeInMS;
  optional double field_id=-1 bestLapTime;
  optional int64 field_id=-1 bestLapNum;
  optional int64 field_id=-1 bestLapSector1TimeInMS;
  optional int64 field_id=-1 bestLapSector2TimeInMS;
  optional int64 field_id=-1 bestLapSector3TimeInMS;
  optional int64 field_id=-1 best

## Timestamp
Remeber Pandas use nanoseconds so you can truncate in milliseconds for compatibility

In [15]:
pq.write_table(table, "example_timestamp.parquet", coerce_timestamps='ms')
pq.write_table(table, "example_trunc_timestamp.parquet", coerce_timestamps='ms', allow_truncated_timestamps=True)

## Compression
By default, Apache arrow uses snappy compression (not so compressed but easier access), although other codecs are allowed

In [16]:
pq.write_table(table, "example_snappy.parquet", compression='snappy')
pq.write_table(table, "example_gzip.parquet", compression='gzip')
pq.write_table(table, "example_brotli.parquet", compression='brotli')
pq.write_table(table, "example_none.parquet", compression='none')

Also, its possible to use more than one compression table

In [17]:
pq.write_table(table, 'example_diff_compr.parquet', 
               compression={b'sessionTime':'snappy', b'lapDistance':'gzip'})

## Write a partitioned parquet table

In [18]:
df = pd.DataFrame(
    {
        'one': [1, 2.5, 3],
        'two': ['Peru', 'Brazil', 'Canada'],
        'three': [True, False, True]
    },
    index=list('abc')
)
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, root_path='dataset_name', partition_cols=['one', 'two'])

## Reading from partitioned dataset

In [19]:
## Dataset_name is created below weird ass tutorial
dataset = pq.ParquetDataset('dataset_name')
table = dataset.read()
table

pyarrow.Table
three: bool
__index_level_0__: string
one: dictionary<values=string, indices=int32, ordered=0>
two: dictionary<values=string, indices=int32, ordered=0>
----
three: [[true],[true],...,[true],[true]]
__index_level_0__: [["a"],["a"],...,["c"],["c"]]
one: [  -- dictionary:
["1","2.5","3"]  -- indices:
[0],  -- dictionary:
["1","2.5","3"]  -- indices:
[0],...,  -- dictionary:
["1","2.5","3"]  -- indices:
[2],  -- dictionary:
["1","2.5","3"]  -- indices:
[2]]
two: [  -- dictionary:
["Peru","Brazil","Canada"]  -- indices:
[0],  -- dictionary:
["Peru","Brazil","Canada"]  -- indices:
[0],...,  -- dictionary:
["Peru","Brazil","Canada"]  -- indices:
[2],  -- dictionary:
["Peru","Brazil","Canada"]  -- indices:
[2]]

## Apache arrow with apache spark
Optimizing times, avoiding serialization and deserialization process
Apache arrow uses:
* Speeding conversion from pandas data frame to spark data frame
* Speeding conversion from Spark Data Frame to Pandas data frame
* Using with Pandas UDF (Vectorized UDF)
* Optimizing R with apache spark


Let's test the conversion among Pandas and Spark first without modifying anything and then with arrow

In [20]:
!pip install pyspark

Defaulting to user installation because normal site-packages is not writeable


In [21]:
from pyspark.sql import SparkSession
warehouseLocation = '/home/kincaid/Documents/ucags/data_science_master/1_sem/sist_intel/homeworks/s12_db'
spark = SparkSession \
    .builder.appName("demoSpark") \
    .config("spark.sql.warehouse.dir", warehouseLocation) \
    .enableHiveSupport() \
    .getOrCreate()


23/06/23 23:28:26 WARN Utils: Your hostname, MSI resolves to a loopback address: 127.0.1.1; using 172.19.207.62 instead (on interface eth0)
23/06/23 23:28:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/23 23:28:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [22]:
# Create test Spark DataFrame
from pyspark.sql.functions import rand
df = spark.range(1 << 22).toDF("id").withColumn("x", rand())
df.printSchema()

root
 |-- id: long (nullable = false)
 |-- x: double (nullable = false)



In [23]:
# Benchmark time
%time pdf = df.toPandas()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
%time pdf = df.toPandas()
pdf.describe()

                                                                                

CPU times: user 23.5 s, sys: 1.7 s, total: 25.2 s
Wall time: 31.9 s


23/06/23 23:29:11 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
                                                                                

CPU times: user 225 ms, sys: 244 ms, total: 469 ms
Wall time: 2.66 s


Unnamed: 0,id,x
count,4194304.0,4194304.0
mean,2097152.0,0.4999772
std,1210791.0,0.2887576
min,0.0,2.844156e-09
25%,1048576.0,0.2497804
50%,2097152.0,0.4998851
75%,3145727.0,0.7502333
max,4194303.0,1.0


test pandas to spark df

In [24]:
%time df = spark.createDataFrame(pdf)
spark.conf.set("spark.sql.execution.arrow.enabled", "false")
%time df = spark.createDataFrame(pdf)
df.describe().show()

CPU times: user 438 ms, sys: 172 ms, total: 610 ms
Wall time: 789 ms


23/06/23 23:29:15 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.


CPU times: user 2min 15s, sys: 736 ms, total: 2min 16s
Wall time: 2min 16s


23/06/23 23:31:32 WARN TaskSetManager: Stage 2 contains a task of very large size (3981 KiB). The maximum recommended task size is 1000 KiB.
[Stage 2:>                                                        (0 + 16) / 16]

+-------+------------------+--------------------+
|summary|                id|                   x|
+-------+------------------+--------------------+
|  count|           4194304|             4194304|
|   mean|         2097151.5|  0.4999772238041727|
| stddev|1210791.4160691209| 0.28875759087101494|
|    min|                 0|2.844156021808430...|
|    max|           4194303|  0.9999999749245246|
+-------+------------------+--------------------+



                                                                                


# Cassandra

In [25]:
!pip install cassandra-driver

Defaulting to user installation because normal site-packages is not writeable


In [26]:
from cassandra.cluster import Cluster

cluster = Cluster(['0.0.0.0'],port=9042)
session = cluster.connect('f1_telem',wait_for_all_pools=True)
session.execute('USE f1_telem')
rows = session.execute('SELECT * FROM telem')
for row in rows:
    print(row.sessiontime_x,row.speed,row.throttle)

Traceback (most recent call last):
  File "cassandra/cluster.py", line 3541, in cassandra.cluster.ControlConnection._reconnect_internal
  File "cassandra/cluster.py", line 3563, in cassandra.cluster.ControlConnection._try_connect
  File "cassandra/cluster.py", line 1630, in cassandra.cluster.Cluster.connection_factory
  File "cassandra/connection.py", line 850, in cassandra.connection.Connection.factory
  File "/home/kincaid/.local/lib/python3.11/site-packages/cassandra/io/asyncorereactor.py", line 347, in __init__
    self._connect_socket()
  File "cassandra/connection.py", line 917, in cassandra.connection.Connection._connect_socket
ConnectionRefusedError: [Errno 111] Tried connecting to [('0.0.0.0', 9042)]. Last error: Connection refused
ERROR:cassandra.cluster:Control connection failed to connect, shutting down Cluster:
Traceback (most recent call last):
  File "cassandra/cluster.py", line 1700, in cassandra.cluster.Cluster.connect
  File "cassandra/cluster.py", line 3507, in cassa

NoHostAvailable: ('Unable to connect to any servers', {'0.0.0.0:9042': ConnectionRefusedError(111, "Tried connecting to [('0.0.0.0', 9042)]. Last error: Connection refused")})