# Data Encoding, Decoding and Flow

## Apache Parquet, ORC and Arrow

We can easily read (decode) and write (encode) data from and to Parquet, ORC and Arrow files interchangeably. The `pyarrow` library allows us to read a Parquet or ORC file into a `pyarrow.Table` object, which is a columnar data structure that can be converted to a Pandas DataFrame. We can also write a `pyarrow.Table` to a Parquet or ORC file.

Parquet has the following types:

- boolean: 1 bit boolean
- int32: 32 bit signed ints
- int64: 64 bit signed ints
- int96: 96 bit signed ints
- float: IEEE 32-bit floating point values
- double: IEEE 64-bit floating point values
- byte_array: arbitrarily long byte arrays
- fixed_len_byte_array: fixed length byte arrays
- string: UTF-8 encoded strings
- enum: enumeration of strings
- temporal: a logical date type

ORC has the following types:

- boolean: 1 bit boolean
- tinyint: 8 bit signed ints
- smallint: 16 bit signed ints
- int: 32 bit signed ints
- bigint: 64 bit signed ints
- float: IEEE 32-bit floating point values
- double: IEEE 64-bit floating point values
- string: UTF-8 encoded strings
- char: ASCII strings
- varchar: UTF-8 strings
- binary: byte arrays
- timestamp: a logical date type
- date: a logical date type
- decimal: arbitrary precision decimals
- list: an ordered collection of objects
- map: a collection of key-value pairs
- struct: an ordered collection of named fields
- union: a list of types

### Reading (Decoding) and Writing (Encoding) a Parquet File

Let's look at how to decode and encode a Parquet file with mock customers data.

In [1]:
import pyarrow as pa
import pyarrow.parquet as pq

In [2]:
table = pq.read_table('../data/userdata1.parquet')

In [3]:
table

pyarrow.Table
registration_dttm: timestamp[ns]
id: int32
first_name: string
last_name: string
email: string
gender: string
ip_address: string
cc: string
country: string
birthdate: string
salary: double
title: string
comments: string
----
registration_dttm: [[2016-02-03 07:55:29.000000000,2016-02-03 17:04:03.000000000,2016-02-03 01:09:31.000000000,2016-02-03 00:36:21.000000000,2016-02-03 05:05:31.000000000,...,2016-02-03 10:30:59.000000000,2016-02-03 17:16:53.000000000,2016-02-03 05:02:20.000000000,2016-02-03 02:41:32.000000000,2016-02-03 09:52:18.000000000]]
id: [[1,2,3,4,5,...,996,997,998,999,1000]]
first_name: [["Amanda","Albert","Evelyn","Denise","Carlos",...,"Dennis","Gloria","Nancy","Annie","Julie"]]
last_name: [["Jordan","Freeman","Morgan","Riley","Burns",...,"Harris","Hamilton","Morris","Daniels","Meyer"]]
email: [["ajordan0@com.com","afreeman1@is.gd","emorgan2@altervista.org","driley3@gmpg.org","cburns4@miitbeian.gov.cn",...,"dharrisrn@eepurl.com","ghamiltonro@rambler.ru","nmor

In [4]:
table.schema

registration_dttm: timestamp[ns]
id: int32
first_name: string
last_name: string
email: string
gender: string
ip_address: string
cc: string
country: string
birthdate: string
salary: double
title: string
comments: string

In [5]:
metadata = pq.read_metadata('../data/userdata1.parquet')

metadata

<pyarrow._parquet.FileMetaData object at 0x754351d5cdb0>
  created_by: parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)
  num_columns: 13
  num_rows: 1000
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 1125

In [6]:
metadata.schema

<pyarrow._parquet.ParquetSchema object at 0x754351d6d200>
required group field_id=-1 hive_schema {
  optional int96 field_id=-1 registration_dttm;
  optional int32 field_id=-1 id;
  optional binary field_id=-1 first_name (String);
  optional binary field_id=-1 last_name (String);
  optional binary field_id=-1 email (String);
  optional binary field_id=-1 gender (String);
  optional binary field_id=-1 ip_address (String);
  optional binary field_id=-1 cc (String);
  optional binary field_id=-1 country (String);
  optional binary field_id=-1 birthdate (String);
  optional double field_id=-1 salary;
  optional binary field_id=-1 title (String);
  optional binary field_id=-1 comments (String);
}

In [7]:
metadata.row_group(0).column(10)

<pyarrow._parquet.ColumnChunkMetaData object at 0x754351d5d710>
  file_offset: 95403
  file_path: 
  physical_type: DOUBLE
  num_values: 1000
  path_in_schema: salary
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x754351d5d800>
      has_min_max: True
      min: 12380.49
      max: 286592.99
      null_count: 68
      distinct_count: 0
      num_values: 932
      physical_type: DOUBLE
      logical_type: None
      converted_type (legacy): NONE
  compression: UNCOMPRESSED
  encodings: ('PLAIN', 'BIT_PACKED', 'RLE')
  has_dictionary_page: False
  dictionary_page_offset: None
  data_page_offset: 95403
  total_compressed_size: 7631
  total_uncompressed_size: 7631

Select the first 3 rows of the table:

In [8]:
table.take([0,1,2])

pyarrow.Table
registration_dttm: timestamp[ns]
id: int32
first_name: string
last_name: string
email: string
gender: string
ip_address: string
cc: string
country: string
birthdate: string
salary: double
title: string
comments: string
----
registration_dttm: [[2016-02-03 07:55:29.000000000,2016-02-03 17:04:03.000000000,2016-02-03 01:09:31.000000000]]
id: [[1,2,3]]
first_name: [["Amanda","Albert","Evelyn"]]
last_name: [["Jordan","Freeman","Morgan"]]
email: [["ajordan0@com.com","afreeman1@is.gd","emorgan2@altervista.org"]]
gender: [["Female","Male","Female"]]
ip_address: [["1.197.201.2","218.111.175.34","7.161.136.94"]]
cc: [["6759521864920116","","6767119071901597"]]
country: [["Indonesia","Canada","Russia"]]
birthdate: [["3/8/1971","1/16/1968","2/1/1960"]]
...

Convert a Table to a DataFrame:

In [9]:
df = table.to_pandas()

In [10]:
df

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 07:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116,Indonesia,3/8/1971,49756.53,Internal Auditor,1E+02
1,2016-02-03 17:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,2016-02-03 01:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597,Russia,2/1/1960,144972.51,Structural Engineer,
3,2016-02-03 00:36:21,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,3576031598965625,China,4/8/1997,90263.05,Senior Cost Accountant,
4,2016-02-03 05:05:31,5,Carlos,Burns,cburns4@miitbeian.gov.cn,,169.113.235.40,5602256255204850,South Africa,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,2016-02-03 10:30:59,996,Dennis,Harris,dharrisrn@eepurl.com,Male,178.180.111.236,374288806662929,Greece,7/8/1965,263399.54,Editor,
996,2016-02-03 17:16:53,997,Gloria,Hamilton,ghamiltonro@rambler.ru,Female,71.50.39.137,,China,4/22/1975,83183.54,VP Product Management,
997,2016-02-03 05:02:20,998,Nancy,Morris,nmorrisrp@ask.com,,6.188.121.221,3553564071014997,Sweden,5/1/1979,,Junior Executive,
998,2016-02-03 02:41:32,999,Annie,Daniels,adanielsrq@squidoo.com,Female,97.221.132.35,30424803513734,China,10/9/1991,18433.85,Editor,​


You can convert the DataFrame back to a Table (note we're using the method from `pa` which is pyarrow):

In [11]:
new_table = pa.Table.from_pandas(df)

new_table

pyarrow.Table
registration_dttm: timestamp[ns]
id: int32
first_name: string
last_name: string
email: string
gender: string
ip_address: string
cc: string
country: string
birthdate: string
salary: double
title: string
comments: string
----
registration_dttm: [[2016-02-03 07:55:29.000000000,2016-02-03 17:04:03.000000000,2016-02-03 01:09:31.000000000,2016-02-03 00:36:21.000000000,2016-02-03 05:05:31.000000000,...,2016-02-03 10:30:59.000000000,2016-02-03 17:16:53.000000000,2016-02-03 05:02:20.000000000,2016-02-03 02:41:32.000000000,2016-02-03 09:52:18.000000000]]
id: [[1,2,3,4,5,...,996,997,998,999,1000]]
first_name: [["Amanda","Albert","Evelyn","Denise","Carlos",...,"Dennis","Gloria","Nancy","Annie","Julie"]]
last_name: [["Jordan","Freeman","Morgan","Riley","Burns",...,"Harris","Hamilton","Morris","Daniels","Meyer"]]
email: [["ajordan0@com.com","afreeman1@is.gd","emorgan2@altervista.org","driley3@gmpg.org","cburns4@miitbeian.gov.cn",...,"dharrisrn@eepurl.com","ghamiltonro@rambler.ru","nmor

You can write the table back to a Parquet file:

In [12]:
pq.write_table(new_table, "../data/userdata2.parquet")

> 1. How many males and females are there?
>
> 2. What is the average salary for customers from China?
>
> 3. Create a new column `full_name` which combines `first_name` and `last_name` with a space in between in the dataframe. Then convert it back to a new Table and write it to a Parquet file.

In [None]:
df['gender'].value_counts()


AttributeError: 'pyarrow.lib.Table' object has no attribute 'groupby'

### Reading (Decoding) and Writing (Encoding) an ORC File

Let's look at how to decode and encode an ORC file with mock customers data.

In [1]:
import pyarrow as pa
from pyarrow import orc

In [2]:
table2 = orc.read_table('../data/userdata1.orc')

In [3]:
table2

pyarrow.Table
_col0: timestamp[ns]
_col1: int32
_col2: string
_col3: string
_col4: string
_col5: string
_col6: string
_col7: string
_col8: string
_col9: string
_col10: double
_col11: string
_col12: string
----
_col0: [[2016-02-03 13:36:39.000000000,2016-02-03 00:22:28.000000000,2016-02-03 18:29:04.000000000,2016-02-03 13:42:19.000000000,2016-02-03 00:15:29.000000000,...,2016-02-03 13:36:49.000000000,2016-02-03 04:39:01.000000000,2016-02-03 00:33:54.000000000,2016-02-03 00:15:08.000000000,2016-02-03 00:53:53.000000000]]
_col1: [[1,2,3,4,5,...,996,997,998,999,1000]]
_col2: [["Donald","Walter","Michelle","Lori","Howard",...,"Carol","Helen","Stephanie","Marie","Alice"]]
_col3: [["Lewis","Collins","Henderson","Hudson","Miller",...,"Warren","Fields","Sims","Medina","Peterson"]]
_col4: [["dlewis0@clickbank.net","wcollins1@bloglovin.com","mhenderson2@geocities.jp","lhudson3@dion.ne.jp","hmiller4@fema.gov",...,"cwarrenrn@geocities.jp","hfieldsro@comcast.net","ssimsrp@newyorker.com","mmedinarq@t

The column names are missing in the ORC file, so we need to specify them manually, we can use the column names from the previous Table.

In [None]:
table2 = table2.rename_columns(table.column_names)

In [None]:
table2

In [None]:
df2 = table2.to_pandas()

df2

You can write the table back to an ORC file:

In [None]:
orc.write_table(table2, "../data/userdata2.orc")

> 1. How many males and females are there from China?
>
> 2. Create a new column `age` which is computed from the birthdate in the dataframe. Then convert it back to a new Table and write it to an ORC file.