In [1]:
import re
import numpy as np
from datetime import datetime

import pyspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

First we will setup a spark context, load our csvs from the filesystem (whether it's just our local example from the pandas pipe or a giant dataset stored in hdfs). We will merge the two files into one dataframe object.

In [2]:
s = SparkSession.builder.appName("my_app_name").getOrCreate()
s.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/22 16:27:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df1 = s.read.csv('data1.csv', header=True, inferSchema=True)
df2 = s.read.csv('data2.csv', header=True, inferSchema=True)

In [4]:
df = df1.unionByName(df2, allowMissingColumns=True)
df = df.withColumn("index", F.monotonically_increasing_id())

In [5]:
print((df.count(), len(df.columns)))

(200, 13)


In [6]:
df.drop("index").show()

+---------+-------+---------+---------+----+------+------+------+--------+----+-----+----+
|SymxUUBCx|RcKB398|uNQMYeBmQ|lOihuqh9a|pIkq|21rkd6|yLdLOm|CLrQTb|LFThQKIw|sYef|00L3k| OjQ|
+---------+-------+---------+---------+----+------+------+------+--------+----+-----+----+
|       51|      2|       37|     null|null|  null|  null|  null|    null|null| null|null|
|       83|     88|       38|     null|null|  null|  null|  null|    null|null| null|null|
|       40|     30|       25|     null|null|  null|  null|  null|    null|null| null|null|
|       27|     52|       66|     null|null|  null|  null|  null|    null|null| null|null|
|       65|     43|       57|     null|null|  null|  null|  null|    null|null| null|null|
|       59|     14|       93|     null|null|  null|  null|  null|    null|null| null|null|
|       28|     84|       20|     null|null|  null|  null|  null|    null|null| null|null|
|       51|      1|       52|     null|null|  null|  null|  null|    null|null| null|null|

In [7]:
df.orderBy(F.desc("index")).drop("index").show(20)

+---------+-------+---------+---------+----+------+------+------+--------+-----+-----+----------+
|SymxUUBCx|RcKB398|uNQMYeBmQ|lOihuqh9a|pIkq|21rkd6|yLdLOm|CLrQTb|LFThQKIw| sYef|00L3k|       OjQ|
+---------+-------+---------+---------+----+------+------+------+--------+-----+-----+----------+
|     null|     30|       57|       49|  34|    83|    26|    43|       0|  nan| 50.0|2020/02/01|
|     null|     82|       68|       92|  68|    43|    76|    61|       1|abc23| 25.0|2020/02/01|
|     null|      8|       92|       51|  37|    23|    46|    86|       1|abc23| 10.0|2020/02/01|
|     null|     13|        7|       89|  81|    52|    86|    87|       0|   34| 25.0|2020/02/01|
|     null|     27|       65|       20|  37|    29|    73|    70|       0|abc23| 10.0|2020/02/01|
|     null|     49|       56|       42|  88|    49|    79|    11|       1| null| 10.0|2020/02/01|
|     null|     46|       46|       84|  75|    97|    92|    11|       1|   86| 50.0|2020/02/01|
|     null|     11| 

In [8]:
df.dtypes

[('SymxUUBCx', 'int'),
 ('RcKB398', 'int'),
 ('uNQMYeBmQ', 'int'),
 ('lOihuqh9a', 'int'),
 ('pIkq', 'int'),
 ('21rkd6', 'int'),
 ('yLdLOm', 'int'),
 ('CLrQTb', 'int'),
 ('LFThQKIw', 'int'),
 ('sYef', 'string'),
 ('00L3k', 'double'),
 ('OjQ', 'string'),
 ('index', 'bigint')]

In [9]:
col1, col2, col3, col4, col5 = df.columns[1], df.columns[8], df.columns[9], df.columns[10], df.columns[11]
(col1, col2, col3, col4, col5)

('RcKB398', 'LFThQKIw', 'sYef', '00L3k', 'OjQ')

In [10]:
#F.regexp_replace(F.col(col5), data_regex, '').isNotNull()

In [11]:
# simple transform
df = df.withColumn(col1, F.col(col1)*5)

# using when to set two different values 
df = df.withColumn(col2, F.when(F.col(col2) == 1, True).otherwise(False))

# only casting to float if we can bring the entire column
row_ct = df.count()
rowswith_col3float = df.filter(F.col(col3).cast("float").isNotNull()).count()
if row_ct == rowswith_col3float:
    df = df.withColumn(col3, F.col(col3).cast("float"))

# using a regex to clean/remove anything non numerical in any string,
# and then do the cast to float for non null (empty str) values
df = df.withColumn(col3, F.regexp_replace(F.col(col3), r"[^0-9\\.]", '').cast("double"))

# using a custom function to convert col4 back to int
# note the necessary redundant logic to process null
def floatint(x):
    return int(float(x))
int_udf = F.udf(lambda m: None if m is None else floatint(m))
df = df.withColumn(col4, F.when(F.col(col4).isNotNull(), int_udf(F.col(col4))).otherwise(None))
# df = df.withColumn(col4, int_udf(F.col(col4))) # can also just do a udf without when

# same, using a regex for matching dates
data_regex = r"\d{2,4}(\.|\-|\/|\\)+\d{2,4}(\.|\-|\/|\\)+\d{2,4}(\s)*(\d{2}\:\d{2}\:\d{2})?(\.\d{3})?|^$"
df = df.withColumn(col5, F.when(F.regexp_replace(F.col(col5), data_regex, '').isNotNull(),\
                                F.to_timestamp(F.col(col5), 'yyyy/MM/dd')).otherwise(None))
# df = df.withColumn(col5, F.to_timestamp(F.col(col5), 'yyyy/MM/dd')) # simpler alternative

# replace and nan in the df with nulls
df = df.replace(np.nan, None)

In [12]:
df.drop("index").show()

+---------+-------+---------+---------+----+------+------+------+--------+----+-----+----+
|SymxUUBCx|RcKB398|uNQMYeBmQ|lOihuqh9a|pIkq|21rkd6|yLdLOm|CLrQTb|LFThQKIw|sYef|00L3k| OjQ|
+---------+-------+---------+---------+----+------+------+------+--------+----+-----+----+
|       51|     10|       37|     null|null|  null|  null|  null|   false|null| null|null|
|       83|    440|       38|     null|null|  null|  null|  null|   false|null| null|null|
|       40|    150|       25|     null|null|  null|  null|  null|   false|null| null|null|
|       27|    260|       66|     null|null|  null|  null|  null|   false|null| null|null|
|       65|    215|       57|     null|null|  null|  null|  null|   false|null| null|null|
|       59|     70|       93|     null|null|  null|  null|  null|   false|null| null|null|
|       28|    420|       20|     null|null|  null|  null|  null|   false|null| null|null|
|       51|      5|       52|     null|null|  null|  null|  null|   false|null| null|null|

In [13]:
# the last 20 rows
df.orderBy(F.desc("index")).drop("index").show(20)

+---------+-------+---------+---------+----+------+------+------+--------+----+-----+-------------------+
|SymxUUBCx|RcKB398|uNQMYeBmQ|lOihuqh9a|pIkq|21rkd6|yLdLOm|CLrQTb|LFThQKIw|sYef|00L3k|                OjQ|
+---------+-------+---------+---------+----+------+------+------+--------+----+-----+-------------------+
|     null|    150|       57|       49|  34|    83|    26|    43|   false|null|   50|2020-02-01 00:00:00|
|     null|    410|       68|       92|  68|    43|    76|    61|    true|23.0|   25|2020-02-01 00:00:00|
|     null|     40|       92|       51|  37|    23|    46|    86|    true|23.0|   10|2020-02-01 00:00:00|
|     null|     65|        7|       89|  81|    52|    86|    87|   false|34.0|   25|2020-02-01 00:00:00|
|     null|    135|       65|       20|  37|    29|    73|    70|   false|23.0|   10|2020-02-01 00:00:00|
|     null|    245|       56|       42|  88|    49|    79|    11|    true|null|   10|2020-02-01 00:00:00|
|     null|    230|       46|       84|  75|  

In [14]:
df.dtypes

[('SymxUUBCx', 'int'),
 ('RcKB398', 'int'),
 ('uNQMYeBmQ', 'int'),
 ('lOihuqh9a', 'int'),
 ('pIkq', 'int'),
 ('21rkd6', 'int'),
 ('yLdLOm', 'int'),
 ('CLrQTb', 'int'),
 ('LFThQKIw', 'boolean'),
 ('sYef', 'double'),
 ('00L3k', 'string'),
 ('OjQ', 'timestamp'),
 ('index', 'bigint')]

You can see the same kinds of transforms are avaialable in pyspark and you can do most of what you'd do with a dataframe. There's also ways to do this outside the dataframe api with a raw rdd but that's beyond the scope of what I wanted to show here.

# This example was simple but your data problems may be complex.

# For any of your complex data problems I'm available to hire on contract to help you build and scale whatever data decision engine you need for your business. 

# Reach out by messaging inquire@automatedinnovations.net