<a href="https://colab.research.google.com/github/groda/big_data/blob/master/Encoding%2Bdataframe%2Bcolumns.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<a href="https://github.com/groda/big_data"><div><img src="https://github.com/groda/big_data/blob/master/logo_bdb.png?raw=true" align=right width="90"></div></a>

# Encode columns in csv file
<br>
<br>



I'm given a CSV file containing strings and I want to convert the characters to numeric values. I want to use different encodings of the characters on different columns or groups of columns.

Let's say for instance that I have two encodings __A__ and __B__:
 - in encoding __A__ I want to encode the character `a` with the number `1`, the character `b` with `2`, and `c` with `3`
 - in encoding __B__ I want to encode the character `a` with the number `2`, the character `b` with `3`, and `c` with `1`

If I use encoding __A__ to transform all columns in table

| c1| c2 |
|-----|-----|
| a | a|
| b | b|
| c | b|

I obtain

| c1_enc| c2_enc |
|-----|-----|
| 1 | 1|
| 2 | 2|
| 3 | 2|

If `col1` is encoded with __A__ and `col2` is encoded with __A__ then the table becomes

| c1_enc| c2_enc |
|-----|-----|
| 1 | 2|
| 2 | 3|
| 3 | 3|

## Install PySpark

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=7d76dfe00695e6f6f3fec293598c5999726364bb607e11a1bc23dea8c8690fa9
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


## Download the data

Retrieve the CSV file `data-1600cols.csv` and write it to the local storage.

In [2]:
import requests
import csv

def download_csv(url, save_path):
    response = requests.get(url)

    if response.status_code == 200:
        with open(save_path, 'wb') as file:
            file.write(response.content)
        print(f"CSV file downloaded successfully and saved at: {save_path}")
    else:
        print(f"Failed to download CSV file. Status code: {response.status_code}")

url = "https://raw.githubusercontent.com/groda/big_data/master/data-1600cols.csv"
save_path = "data-1600cols.csv"

download_csv(url, save_path)

CSV file downloaded successfully and saved at: data-1600cols.csv


## Initialize Spark session

SparkContext allows me to access Dataframes, change Spark configuration, cancel a job, get status of a job, etc.

Load  the CSV file `data-1600cols.csv` into a Spark dataframe using the file's header as column names.

In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext

spark = SparkSession \
            .builder \
            .master("local") \
            .appName("Encode multiple columns") \
            .getOrCreate()

sqlContext = SQLContext(spark)
df = sqlContext.read.csv("data-1600cols.csv", header=True)



Check configuration

In [4]:
spark.sparkContext.getConf().getAll()

[('spark.master', 'local'),
 ('spark.driver.extraJavaOptions',
  '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false'),
 ('spark.driver.host', '6920187c1a5f'),
 ('spark.executor.id', 'driver'),
 

Check size of the dataframe (number of rows and columns)

In [5]:
print('Number of rows: {}\nNumber of columns: {}'.format(df.count(),len(df.columns)))

Number of rows: 1000
Number of columns: 1600


Check if the dataframe contains any nulls?

In [6]:
df.where(df.V2.isNull()).collect()

[]

Show a couple of columns

In [7]:
df.select('V1','V2','V3').show()

+---+---+---+
| V1| V2| V3|
+---+---+---+
|  j|  n|  d|
|  d|  n|  w|
|  p|  h|  a|
|  b|  h|  e|
|  z|  x|  u|
|  b|  e|  v|
|  y|  t|  x|
|  i|  r|  e|
|  x|  e|  g|
|  l|  j|  z|
|  l|  v|  l|
|  z|  n|  h|
|  s|  m|  c|
|  g|  m|  f|
|  i|  p|  n|
|  i|  f|  b|
|  u|  n|  j|
|  s|  o|  e|
|  k|  y|  c|
|  h|  b|  i|
+---+---+---+
only showing top 20 rows



## First approach

Using the `translate` function from `pyspark.sql` and adding a new column with `withColumn` at each step. Test on a small dataframe `test_df`.

In [8]:
import pyspark.sql.functions as f

test_df = sqlContext.createDataFrame([('a', 'a'), ('b', 'b'), ('c', 'b')], ['c1', 'c2'])
test_df.show()

chars = "abc"
A = "123" # encoding A
B = "231" # encoding B


for col_name in ["c1", "c2"]:
    test_df = test_df.withColumn(col_name+'_enc', f.translate(f.col(col_name), "abcd", A))

test_df.show()

+---+---+
| c1| c2|
+---+---+
|  a|  a|
|  b|  b|
|  c|  b|
+---+---+

+---+---+------+------+
| c1| c2|c1_enc|c2_enc|
+---+---+------+------+
|  a|  a|     1|     1|
|  b|  b|     2|     2|
|  c|  b|     3|     2|
+---+---+------+------+



Try out this approach on the big dataframe, applying the function to a few columns. I define two random encodings, `encodingA` and `encodingB` and apply each encoding to two different columns.

In [9]:
import string
import random

# set a raneom seed
random.seed(30)

chars = string.ascii_lowercase
encodingA = ''.join(random.choice(string.digits) for i in range(len(chars)))
encodingB = ''.join(random.choice(string.digits) for i in range(len(chars)))

print("Encodings:")
print(chars)
print(encodingA)
print(encodingB)
print("-"*26)
new_df=df

for col_name in ["V1", "V3"]:  # apply encodingA to columns V1, V3
    new_df=new_df.withColumn(col_name+'_enc',f.translate(f.col(col_name), chars, encodingA))
for col_name in ["V2", "V4"]:  # apply encodingB to columns V2, V4
    new_df=new_df.withColumn(col_name+'_enc',f.translate(f.col(col_name), chars, encodingB))

new_df.select("V1","V2","V3","V4", "V1_enc", "V2_enc", "V3_enc", "V4_enc").show()

Encodings:
abcdefghijklmnopqrstuvwxyz
84909340662170830129865816
03946914819742444812351068
--------------------------
+---+---+---+---+------+------+------+------+
| V1| V2| V3| V4|V1_enc|V2_enc|V3_enc|V4_enc|
+---+---+---+---+------+------+------+------+
|  j|  n|  d|  m|     6|     2|     0|     4|
|  d|  n|  w|  y|     0|     2|     5|     6|
|  p|  h|  a|  h|     3|     4|     8|     4|
|  b|  h|  e|  t|     4|     4|     9|     2|
|  z|  x|  u|  d|     6|     0|     8|     4|
|  b|  e|  v|  j|     4|     6|     6|     1|
|  y|  t|  x|  w|     1|     2|     8|     1|
|  i|  r|  e|  q|     6|     8|     9|     4|
|  x|  e|  g|  s|     8|     6|     4|     1|
|  l|  j|  z|  h|     1|     1|     6|     4|
|  l|  v|  l|  w|     1|     5|     1|     1|
|  z|  n|  h|  z|     6|     2|     0|     8|
|  s|  m|  c|  z|     2|     4|     9|     8|
|  g|  m|  f|  j|     4|     4|     3|     1|
|  i|  p|  n|  h|     6|     4|     0|     4|
|  i|  f|  b|  r|     6|     9|     4|     8|
|  u|  

Apply encodings to 4 columns

In [10]:
new_df=df

for col_name in ["V1", "V3"]:  # apply encodingA to columns V1, V2
    new_df = new_df.withColumn(col_name,f.translate(f.col(col_name), chars, encodingA))
for col_name in ["V2", "V4"]:  # apply encodingB to columns V3, V4
    new_df = new_df.withColumn(col_name,f.translate(f.col(col_name), chars, encodingB))

new_df.select("V1","V2","V3","V4").show(3)

+---+---+---+---+
| V1| V2| V3| V4|
+---+---+---+---+
|  6|  2|  0|  4|
|  0|  2|  5|  6|
|  3|  4|  8|  4|
+---+---+---+---+
only showing top 3 rows



Check:


| V1 | V2 | V3 | V4
|---|---|---|---|
| 6 | 2 | 0 | 4 |
| 0 | 2 | 5 | 6 |
| 3 | 4 | 8 | 4 |

When applying encoding to thousands of rows the previous approach is too slow. The reason is that I'm writing a new dataframe after each tranformation.

Split columns in even and odd, apply two different encodings to each set of columns.

In [11]:
cols_e = ["V"+str(i) for i in range(2,5,2)]
cols_o = ["V"+str(i) for i in range(1,4,2)]

print(cols_e)
print(cols_o)

new_df=df

# works with a few columns (4 in total in this example) but too slow for thousands of columns
for col_name in cols_o:  # apply encodingA to columns with even numbers
    new_df=new_df.withColumn(col_name,f.translate(f.col(col_name), chars, encodingA))
for col_name in cols_e:  # apply encodingB to odd columns
    new_df=new_df.withColumn(col_name,f.translate(f.col(col_name), chars, encodingB))

new_df.select(["V"+str(i) for i in range(1,5)]).show(3)

['V2', 'V4']
['V1', 'V3']
+---+---+---+---+
| V1| V2| V3| V4|
+---+---+---+---+
|  6|  2|  0|  4|
|  0|  2|  5|  6|
|  3|  4|  8|  4|
+---+---+---+---+
only showing top 3 rows



## Second approach
Using `udf` (user-defined functions). Avoiding `withColumn` and using `select` instead.

In [12]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType

# define an encoding as a list of two strings of equal length

o = ["abcdefghijklmnopqrstuvwxyz", encodingA]

def enc(*a):
    # encode string s with encoding o
    s=a[0]
    for i in range(len(o[0])):
      if s==o[0][i]:
          return o[1][i]
    return s

# create udf
encode_udf = udf(enc, StringType())

cols_o = ["V"+str(i) for i in range(7) if i%2==1]
print(cols_o)

(
df.select("V1","V3","V5",
           encode_udf("V1").alias("V1_enc"),
           encode_udf("V3").alias("V3_enc"),
           encode_udf("V5").alias("V5_enc"))
    .show(10)
)


['V1', 'V3', 'V5']
+---+---+---+------+------+------+
| V1| V3| V5|V1_enc|V3_enc|V5_enc|
+---+---+---+------+------+------+
|  j|  d|  s|     6|     0|     2|
|  d|  w|  l|     0|     5|     1|
|  p|  a|  w|     3|     8|     5|
|  b|  e|  x|     4|     9|     8|
|  z|  u|  b|     6|     8|     4|
|  b|  v|  u|     4|     6|     8|
|  y|  x|  z|     1|     8|     6|
|  i|  e|  k|     6|     9|     2|
|  x|  g|  s|     8|     4|     2|
|  l|  z|  l|     1|     6|     1|
+---+---+---+------+------+------+
only showing top 10 rows



And now encode all even and odd numbered columns with `encodingA` and `encodingB`, respectively using `select`.

In [13]:
# apply function to 50 columns
new_df=df.select([encode_udf("V"+str(i)).alias("V"+str(i)+"_enc") for i in range(1,100,2)])
new_df.select(["V"+str(i)+"_enc" for i in range(1,21,2)]).show(10)

+------+------+------+------+------+-------+-------+-------+-------+-------+
|V1_enc|V3_enc|V5_enc|V7_enc|V9_enc|V11_enc|V13_enc|V15_enc|V17_enc|V19_enc|
+------+------+------+------+------+-------+-------+-------+-------+-------+
|     6|     0|     2|     6|     9|      8|      2|      2|      3|      6|
|     0|     5|     1|     8|     0|      2|      9|      6|      8|      2|
|     3|     8|     5|     4|     8|      3|      9|      0|      2|      9|
|     4|     9|     8|     0|     9|      0|      9|      2|      8|      0|
|     6|     8|     4|     0|     9|      2|      8|      6|      6|      6|
|     4|     6|     8|     5|     8|      6|      5|      6|      6|      4|
|     1|     8|     6|     0|     4|      8|      4|      5|      5|      1|
|     6|     9|     2|     5|     8|      8|      5|      4|      0|      1|
|     8|     4|     2|     2|     2|      2|      9|      7|      8|      0|
|     1|     6|     1|     9|     6|      2|      6|      1|      1|      2|

In [14]:
# apply function to 100 columns
new_df=df.select([encode_udf("V"+str(i)).alias("V"+str(i)+"_enc") for i in range(1,201,2)])
new_df.select(["V"+str(i)+"_enc" for i in range(1,21,2)]).show(10)

+------+------+------+------+------+-------+-------+-------+-------+-------+
|V1_enc|V3_enc|V5_enc|V7_enc|V9_enc|V11_enc|V13_enc|V15_enc|V17_enc|V19_enc|
+------+------+------+------+------+-------+-------+-------+-------+-------+
|     6|     0|     2|     6|     9|      8|      2|      2|      3|      6|
|     0|     5|     1|     8|     0|      2|      9|      6|      8|      2|
|     3|     8|     5|     4|     8|      3|      9|      0|      2|      9|
|     4|     9|     8|     0|     9|      0|      9|      2|      8|      0|
|     6|     8|     4|     0|     9|      2|      8|      6|      6|      6|
|     4|     6|     8|     5|     8|      6|      5|      6|      6|      4|
|     1|     8|     6|     0|     4|      8|      4|      5|      5|      1|
|     6|     9|     2|     5|     8|      8|      5|      4|      0|      1|
|     8|     4|     2|     2|     2|      2|      9|      7|      8|      0|
|     1|     6|     1|     9|     6|      2|      6|      1|      1|      2|

In [15]:
# apply function to 400 columns
new_df=df.select([encode_udf("V"+str(i)).alias("V"+str(i)+"_enc") for i in range(1,401,2)])
new_df.select(["V"+str(i)+"_enc" for i in range(381,401,2)]).show(10)

+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|V381_enc|V383_enc|V385_enc|V387_enc|V389_enc|V391_enc|V393_enc|V395_enc|V397_enc|V399_enc|
+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|       4|       8|       6|       8|       9|       6|       8|       8|       6|       8|
|       6|       4|       8|       4|       1|       6|       6|       0|       9|       6|
|       9|       6|       8|       8|       2|       4|       0|       5|       9|       1|
|       3|       6|       6|       6|       1|       9|       0|       0|       4|       4|
|       6|       0|       1|       0|       1|       1|       2|       9|       2|       8|
|       0|       4|       3|       4|       8|       4|       8|       6|       2|       1|
|       1|       1|       8|       2|       6|       2|       1|       5|       1|       4|
|       4|       4|       1|       0|       1|       2|       4|       8|       

In [16]:
# apply function to all odd columns

new_df = df.select([encode_udf("V"+str(i)).alias("V"+str(i)+"_enc") for i in range(1,801,2)])

new_df.select(["V"+str(i)+"_enc" for i in range(781,801,2)]).show(10)

+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|V781_enc|V783_enc|V785_enc|V787_enc|V789_enc|V791_enc|V793_enc|V795_enc|V797_enc|V799_enc|
+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|       2|       8|       0|       9|       9|       3|       0|       2|       6|       8|
|       0|       9|       3|       5|       9|       9|       0|       1|       5|       9|
|       6|       4|       8|       8|       3|       8|       5|       0|       3|       0|
|       4|       7|       0|       6|       2|       1|       0|       6|       0|       4|
|       3|       3|       7|       6|       8|       8|       6|       4|       0|       6|
|       8|       8|       1|       8|       8|       4|       4|       5|       4|       2|
|       9|       0|       8|       2|       0|       0|       6|       0|       6|       2|
|       1|       2|       5|       6|       6|       9|       2|       7|       

Now I want to apply different udfs

In [17]:
o = ["abcdefghijklmnopqrstuvwxyz", encodingA]
e = ["abcdefghijklmnopqrstuvwxyz", encodingB]

# define two encoding functions

def enc1(*a):
    # encode string s with encoding o
    s=a[0]
    for i in range(len(o[0])):
      if s==o[0][i]:
          return o[1][i]
    return s

def enc2(*a):
    # encode string s with encoding e
    s=a[0]
    for i in range(len(e[0])):
      if s==e[0][i]:
          return e[1][i]
    return s

# create udfs
encode_udf1 = udf(enc1, StringType())
encode_udf2 = udf(enc2, StringType())

new_df = df.select([encode_udf1("V"+str(i)).alias("V"+str(i)+"_enc") for i in range(1,800,2)]+
                  [encode_udf2("V"+str(i)).alias("V"+str(i)+"_enc") for i in range(2,801,2)])
new_df.select(["V"+str(i)+"_enc" for i in range(1,5)]+["V"+str(i)+"_enc" for i in range(795,801)]).show(10)

+------+------+------+------+--------+--------+--------+--------+--------+--------+
|V1_enc|V2_enc|V3_enc|V4_enc|V795_enc|V796_enc|V797_enc|V798_enc|V799_enc|V800_enc|
+------+------+------+------+--------+--------+--------+--------+--------+--------+
|     6|     2|     0|     4|       2|       4|       6|       4|       8|       8|
|     0|     2|     5|     6|       1|       5|       5|       6|       9|       3|
|     3|     4|     8|     4|       0|       0|       3|       2|       0|       6|
|     4|     4|     9|     2|       6|       4|       0|       4|       4|       8|
|     6|     0|     8|     4|       4|       4|       0|       6|       6|       4|
|     4|     6|     6|     1|       5|       5|       4|       9|       2|       1|
|     1|     2|     8|     1|       0|       4|       6|       8|       2|       4|
|     6|     8|     9|     4|       7|       9|       6|       4|       0|       1|
|     8|     6|     4|     1|       9|       8|       8|       8|       1|  

## Export dataframe to file

In [18]:
import time
timestamp = time.strftime("%Y%m%d%H%M%S")
new_df.write.csv('out'+timestamp+'.csv', sep=',')
print('saved out{}.csv'.format(timestamp))

saved out20240423190117.csv


Save to CSV with headers

In [19]:
timestamp = time.strftime("%Y%m%d%H%M%S")
new_df.write.csv('out'+timestamp+'.csv', sep=',', header = True)
print('saved out{}.csv'.format(timestamp))

saved out20240423190129.csv


In [20]:
!ls out*

out20240423190117.csv:
part-00000-1c000852-762a-49a9-a6d7-2c96a2e72d8c-c000.csv  _SUCCESS

out20240423190129.csv:
part-00000-2715ade8-7b69-456d-bfed-a4ed900472b3-c000.csv  _SUCCESS


## Useful commands for checking system resources

The `free -h` and `lscpu` commands are useful for retrieving information about system resources in a Linux environment.

The `free -h` command displays information about the system's memory usage in human-readable format. With the `-h` option the command displays sizes in a more human-readable format, using units such as megabytes (MB) and gigabytes (GB) in place of bytes.

In [21]:
!free -h

               total        used        free      shared  buff/cache   available
Mem:            12Gi       1.6Gi       3.7Gi       1.0Mi       7.3Gi        10Gi
Swap:             0B          0B          0B


The `lscpu` command displays detailed information about the CPU architecture.

In [22]:
!lscpu

Architecture:            x86_64
  CPU op-mode(s):        32-bit, 64-bit
  Address sizes:         46 bits physical, 48 bits virtual
  Byte Order:            Little Endian
CPU(s):                  2
  On-line CPU(s) list:   0,1
Vendor ID:               GenuineIntel
  Model name:            Intel(R) Xeon(R) CPU @ 2.20GHz
    CPU family:          6
    Model:               79
    Thread(s) per core:  2
    Core(s) per socket:  1
    Socket(s):           1
    Stepping:            0
    BogoMIPS:            4400.44
    Flags:               fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clf
                         lush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_
                         good nopl xtopology nonstop_tsc cpuid tsc_known_freq pni pclmulqdq ssse3 fm
                         a cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hyp
                         ervisor lahf_lm abm 3dnowprefetch invpcid_single ssbd i

In the context of distributed computing, specific values provided by the lscpu command are of particular interest:

*   the number of CPUs
*   cores per socket
*   threads per core
*   sockets

Understanding these parameters is crucial for assessing the system's potential parallelism.

Sockets represents the number of physical processors. Each processor can have one or more cores and each core can execute one or two threads concurrently.

Finally, the number of CPUs indicates the total count of independent processing units within each CPU. This is the theoretical upper limit on the number of tasks that can be executed concurrently, offering valuable information for maximizing computational efficiency in distributed computing scenarios.

For instance, if you have

```
Thread(s) per core:    2
Core(s) per socket:    4
Socket(s):             1
```

then the total number of independent processing units is

$$ 1 × 4 × 2 = 8$$

See also: [How many physical CPUs does my machine have?](https://superuser.com/questions/1691479/how-many-physical-cpus-does-my-machine-have).

