In [1]:
def shape(data):
    num_rows = data.count()
    num_columns = len(data.columns)
    print(f"Shape: ({num_rows}, {num_columns})")

In [2]:
import warnings
warnings.filterwarnings("ignore") # Ignores all warnings

In [3]:
import findspark
findspark.init()

import pyspark

In [4]:
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

sc = SparkContext()
spark = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/04 22:19:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
from pyspark import SparkFiles
sc.addFile('https://raw.githubusercontent.com/zsu020958/data_base/main/Python/cleaning_data_in_pyspark/DallasCouncilVoters.csv')

# dataset
voters_df = spark.read.format('csv') \
    .options(header=True) \
    .options(inferSchema=True) \
    .load(SparkFiles.get('DallasCouncilVoters.csv'))

In [6]:
voters_df.printSchema()

root
 |-- DATE: string (nullable = true)
 |-- TITLE: string (nullable = true)
 |-- VOTER_NAME: string (nullable = true)



In [7]:
shape(voters_df)

Shape: (44625, 3)


In [8]:
voters_df.show(3)

+----------+-------------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|
+----------+-------------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
+----------+-------------+-------------------+
only showing top 3 rows



In [9]:
# ### Check NaN, Null
from pyspark.sql.functions import count, when, isnan, col

In [10]:
voters_df.select([count(when(isnan(c), c)).alias(c) for c in voters_df.columns]).toPandas().T

Unnamed: 0,0
DATE,0
TITLE,0
VOTER_NAME,0


In [11]:
voters_df.select([count(when(col(c).isNull(), c)).alias(c) for c in voters_df.columns]).toPandas().T

Unnamed: 0,0
DATE,0
TITLE,195
VOTER_NAME,503


- Got null >> delete null

In [12]:
voters_df = voters_df.dropna(subset='VOTER_NAME')
voters_df.select([count(when(col(c).isNull(), c)).alias(c) for c in voters_df.columns]).toPandas().T


Unnamed: 0,0
DATE,0
TITLE,0
VOTER_NAME,0


In [13]:
num_rows = voters_df.count()
num_dist_rows = voters_df.distinct().count()
dup_rows = num_rows - num_dist_rows
display(num_rows, num_dist_rows, dup_rows)

44122

1273

42849

In [14]:
# check duplicates
voters_df.filter(voters_df['VOTER_NAME'] == 'Philip T. Kingston').show(5)


+----------+-------------+------------------+
|      DATE|        TITLE|        VOTER_NAME|
+----------+-------------+------------------+
|02/08/2017|Councilmember|Philip T. Kingston|
|02/08/2017|Councilmember|Philip T. Kingston|
|01/11/2017|Councilmember|Philip T. Kingston|
|09/14/2016|Councilmember|Philip T. Kingston|
|01/04/2017|Councilmember|Philip T. Kingston|
+----------+-------------+------------------+
only showing top 5 rows



In [15]:
voters_df = voters_df.drop_duplicates()
voters_df.count()

1273

In [16]:
# show the distinct VOTER_NAME entries
voters_df.select(voters_df['VOTER_NAME']).distinct().show(10)

+--------------------+
|          VOTER_NAME|
+--------------------+
|      Tennell Atkins|
|  the  final   20...|
|        Scott Griggs|
|       Scott  Griggs|
|       Sandy Greyson|
| Michael S. Rawlings|
| the final 2018 A...|
|        Kevin Felder|
|        Adam Medrano|
|       Casey  Thomas|
+--------------------+
only showing top 10 rows



In [17]:
# filter voter_df where the VOTER_NAME is 1-20 characters in length
voters_df = voters_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')
voters_df.show(5)

+----------+--------------------+------------------+
|      DATE|               TITLE|        VOTER_NAME|
+----------+--------------------+------------------+
|04/11/2018|Deputy Mayor Pro Tem|      Adam Medrano|
|02/14/2018|       Councilmember|   Lee M. Kleinman|
|04/25/2018|       Councilmember|    Tennell Atkins|
|08/29/2018|       Councilmember|      Kevin Felder|
|10/18/2017|       Councilmember|Jennifer S.  Gates|
+----------+--------------------+------------------+
only showing top 5 rows



In [18]:
# filter out voter_df where the VOTER_NAME contains an underscore (remove)
voters_df = voters_df.filter(~ col('VOTER_NAME').contains('_'))

In [19]:
# show the distinct VOTER_NAME entries again
voters_df.select('VOTER_NAME').distinct().show(10, truncate=False)

+-------------------+
|VOTER_NAME         |
+-------------------+
|Tennell Atkins     |
|Scott Griggs       |
|Scott  Griggs      |
|Sandy Greyson      |
|Michael S. Rawlings|
|Kevin Felder       |
|Adam Medrano       |
|Casey  Thomas      |
|Mark  Clayton      |
|Casey Thomas       |
+-------------------+
only showing top 10 rows



### Modifying DataFrame

In [20]:
from pyspark.sql.functions import *

In [21]:
# add a new column called splits separated om whitespace
voters_df = voters_df.withColumn('splits', split(voters_df.VOTER_NAME, '\s+'))

In [22]:
voters_df.show(5)

+----------+--------------------+------------------+--------------------+
|      DATE|               TITLE|        VOTER_NAME|              splits|
+----------+--------------------+------------------+--------------------+
|04/11/2018|Deputy Mayor Pro Tem|      Adam Medrano|     [Adam, Medrano]|
|02/14/2018|       Councilmember|   Lee M. Kleinman| [Lee, M., Kleinman]|
|04/25/2018|       Councilmember|    Tennell Atkins|   [Tennell, Atkins]|
|08/29/2018|       Councilmember|      Kevin Felder|     [Kevin, Felder]|
|10/18/2017|       Councilmember|Jennifer S.  Gates|[Jennifer, S., Ga...|
+----------+--------------------+------------------+--------------------+
only showing top 5 rows



In [23]:
# create a new column called first name based on the first item in splits
voters_df = voters_df.withColumn('first_name', voters_df.splits.getItem(0))
voters_df.show(2)

+----------+--------------------+---------------+-------------------+----------+
|      DATE|               TITLE|     VOTER_NAME|             splits|first_name|
+----------+--------------------+---------------+-------------------+----------+
|04/11/2018|Deputy Mayor Pro Tem|   Adam Medrano|    [Adam, Medrano]|      Adam|
|02/14/2018|       Councilmember|Lee M. Kleinman|[Lee, M., Kleinman]|       Lee|
+----------+--------------------+---------------+-------------------+----------+
only showing top 2 rows



In [24]:
# get the last entry of the splits list and create a column called last name
voters_df = voters_df.withColumn('last_name', voters_df.splits.getItem(size('splits') - 1))
voters_df.show(2)

+----------+--------------------+---------------+-------------------+----------+---------+
|      DATE|               TITLE|     VOTER_NAME|             splits|first_name|last_name|
+----------+--------------------+---------------+-------------------+----------+---------+
|04/11/2018|Deputy Mayor Pro Tem|   Adam Medrano|    [Adam, Medrano]|      Adam|  Medrano|
|02/14/2018|       Councilmember|Lee M. Kleinman|[Lee, M., Kleinman]|       Lee| Kleinman|
+----------+--------------------+---------------+-------------------+----------+---------+
only showing top 2 rows



In [25]:
# drop the splits column
#voters_df = voters_df.drop('splits')

In [26]:
# add a column to voter_df for any voter with the title 'Councilmember'
voters_df = voters_df.withColumn('random_val', when(voters_df.TITLE == 'Councilmember', rand()))
voters_df.show(5) 

+----------+--------------------+------------------+--------------------+----------+---------+-------------------+
|      DATE|               TITLE|        VOTER_NAME|              splits|first_name|last_name|         random_val|
+----------+--------------------+------------------+--------------------+----------+---------+-------------------+
|04/11/2018|Deputy Mayor Pro Tem|      Adam Medrano|     [Adam, Medrano]|      Adam|  Medrano|               NULL|
|02/14/2018|       Councilmember|   Lee M. Kleinman| [Lee, M., Kleinman]|       Lee| Kleinman|  0.414099294404493|
|04/25/2018|       Councilmember|    Tennell Atkins|   [Tennell, Atkins]|   Tennell|   Atkins|  0.964957820832375|
|08/29/2018|       Councilmember|      Kevin Felder|     [Kevin, Felder]|     Kevin|   Felder| 0.6088138321116783|
|10/18/2017|       Councilmember|Jennifer S.  Gates|[Jennifer, S., Ga...|  Jennifer|    Gates|0.27300987908556695|
+----------+--------------------+------------------+--------------------+-------

In [27]:
# add a column to voters_df for a voter based on their position
voters_df = voters_df.withColumn('random_val', 
                                 when(voters_df.TITLE == 'Councilmember', rand())
                                 .when(voters_df.TITLE == 'Mayor', 2)
                                 .otherwise(0))

In [28]:
voters_df.show(5)

+----------+--------------------+------------------+--------------------+----------+---------+-------------------+
|      DATE|               TITLE|        VOTER_NAME|              splits|first_name|last_name|         random_val|
+----------+--------------------+------------------+--------------------+----------+---------+-------------------+
|04/11/2018|Deputy Mayor Pro Tem|      Adam Medrano|     [Adam, Medrano]|      Adam|  Medrano|                0.0|
|02/14/2018|       Councilmember|   Lee M. Kleinman| [Lee, M., Kleinman]|       Lee| Kleinman|0.27704564862852665|
|04/25/2018|       Councilmember|    Tennell Atkins|   [Tennell, Atkins]|   Tennell|   Atkins|0.48321996392774413|
|08/29/2018|       Councilmember|      Kevin Felder|     [Kevin, Felder]|     Kevin|   Felder|0.02436788692368619|
|10/18/2017|       Councilmember|Jennifer S.  Gates|[Jennifer, S., Ga...|  Jennifer|    Gates|0.22666829644174258|
+----------+--------------------+------------------+--------------------+-------

In [29]:
voters_df.filter(voters_df.random_val == 0).show(5)

+----------+--------------------+-----------------+--------------------+----------+---------+----------+
|      DATE|               TITLE|       VOTER_NAME|              splits|first_name|last_name|random_val|
+----------+--------------------+-----------------+--------------------+----------+---------+----------+
|04/11/2018|Deputy Mayor Pro Tem|     Adam Medrano|     [Adam, Medrano]|      Adam|  Medrano|       0.0|
|04/12/2017|       Mayor Pro Tem| Monica R. Alonzo|[Monica, R., Alonzo]|    Monica|   Alonzo|       0.0|
|06/28/2017|Deputy Mayor Pro Tem|     Adam Medrano|     [Adam, Medrano]|      Adam|  Medrano|       0.0|
|01/03/2018|Deputy Mayor Pro Tem|     Adam Medrano|     [Adam, Medrano]|      Adam|  Medrano|       0.0|
|01/17/2018|       Mayor Pro Tem|Dwaine R. Caraway|[Dwaine, R., Cara...|    Dwaine|  Caraway|       0.0|
+----------+--------------------+-----------------+--------------------+----------+---------+----------+
only showing top 5 rows



### UDF

In [30]:
from pyspark.sql.types import *

In [31]:
def getFirstAndMiddle(names):
    # return a space separated string of names
    return ' '.join(names[:-1])

In [32]:
# define the method as a UDF
udfFirstAndMiddle = udf(getFirstAndMiddle, StringType())

# create a new column using UDF
voters_df = voters_df.withColumn('first_and_midle_name', udfFirstAndMiddle(voters_df.splits))

In [33]:
# drop the unnecessary columns then show the DataFrame
voters_df = voters_df.drop('first_name')
voters_df = voters_df.drop('splits')
voters_df.show(5)

+----------+--------------------+------------------+---------+-------------------+--------------------+
|      DATE|               TITLE|        VOTER_NAME|last_name|         random_val|first_and_midle_name|
+----------+--------------------+------------------+---------+-------------------+--------------------+
|04/11/2018|Deputy Mayor Pro Tem|      Adam Medrano|  Medrano|                0.0|                Adam|
|02/14/2018|       Councilmember|   Lee M. Kleinman| Kleinman|0.27704564862852665|              Lee M.|
|04/25/2018|       Councilmember|    Tennell Atkins|   Atkins|0.48321996392774413|             Tennell|
|08/29/2018|       Councilmember|      Kevin Felder|   Felder|0.02436788692368619|               Kevin|
|10/18/2017|       Councilmember|Jennifer S.  Gates|    Gates|0.22666829644174258|         Jennifer S.|
+----------+--------------------+------------------+---------+-------------------+--------------------+
only showing top 5 rows



### Adding an ID Field

In [34]:
# select all the unique council voters
voters_df = voters_df.select(voters_df["VOTER_NAME"]).distinct()

# count the rows in voters_df
print('\nThere are %d rows in the voters_df DataFrame.\n' % voters_df.count())


There are 27 rows in the voters_df DataFrame.



In [35]:
from pyspark.sql.functions import monotonically_increasing_id

In [36]:
# add a ROW_ID
voters_df = voters_df.withColumn('ROW_ID', monotonically_increasing_id())

In [37]:
voters_df.show()

+-------------------+------+
|         VOTER_NAME|ROW_ID|
+-------------------+------+
|     Tennell Atkins|     0|
|       Scott Griggs|     1|
|      Scott  Griggs|     2|
|      Sandy Greyson|     3|
|Michael S. Rawlings|     4|
|       Kevin Felder|     5|
|       Adam Medrano|     6|
|      Casey  Thomas|     7|
|      Mark  Clayton|     8|
|       Casey Thomas|     9|
|     Sandy  Greyson|    10|
|       Mark Clayton|    11|
| Jennifer S.  Gates|    12|
|  Tiffinni A. Young|    13|
|   B. Adam  McGough|    14|
|       Omar Narvaez|    15|
| Philip T. Kingston|    16|
| Rickey D. Callahan|    17|
|  Dwaine R. Caraway|    18|
|Philip T.  Kingston|    19|
+-------------------+------+
only showing top 20 rows



In [38]:
# show the rows with 10 highest IDs in the set
voters_df.orderBy(voters_df.ROW_ID.desc()).show(10)

+-------------------+------+
|         VOTER_NAME|ROW_ID|
+-------------------+------+
|       Lee Kleinman|    26|
|        Erik Wilson|    25|
|Carolyn King Arnold|    24|
|Rickey D.  Callahan|    23|
|   Monica R. Alonzo|    22|
|    Lee M. Kleinman|    21|
|  Jennifer S. Gates|    20|
|Philip T.  Kingston|    19|
|  Dwaine R. Caraway|    18|
| Rickey D. Callahan|    17|
+-------------------+------+
only showing top 10 rows



### IDs with different partitions

In [39]:
# print the number of partitions in each DataFrame
print('\nThere are %d partitions in the voters_df DataFrame.\n' % voters_df.rdd.getNumPartitions())


There are 1 partitions in the voters_df DataFrame.



In [40]:
# determine the highest ROW_ID and save it in previous_max_ID
previous_max_ID = voters_df.select('ROW_ID').rdd.max()[0]

# add a ROW_ID column to voter_df_april  starting at the desired value
voter_df_april = voters_df.withColumn('ROW_ID', 
                                      monotonically_increasing_id() + previous_max_ID)

In [41]:
# show the ROW_ID from both DataFrames and compare
voters_df.select('ROW_ID').show(5)
voter_df_april.select('ROW_ID').show(5)

+------+
|ROW_ID|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
+------+
only showing top 5 rows

+------+
|ROW_ID|
+------+
|    26|
|    27|
|    28|
|    29|
|    30|
+------+
only showing top 5 rows



25/07/04 22:19:31 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
