Skip to content
This repository has been archived by the owner on May 18, 2023. It is now read-only.

Commit

Permalink
Merge pull request #6 from moj-analytical-services/nulloutvalues
Browse files Browse the repository at this point in the history
Nulloutvalues function added
  • Loading branch information
mamonu committed Dec 22, 2020
2 parents a620d11 + e62d440 commit 355b817
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,6 @@ dmypy.json

# Cython debug symbols
cython_debug/

spark-warehouse/

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "splink_data_standardisation"
version = "0.2.1"
version = "0.2.2"
description = ""
authors = ["Robin Linacre <robin.linacre@digital.justice.gov.uk>"]
license = "MIT"
Expand Down
34 changes: 34 additions & 0 deletions splink_data_standardisation/remove_anomalies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.session import SparkSession
from pyspark.sql import functions as f


def null_out_values(df: DataFrame, colname: str, values_to_null):
"""Null out a list of undesirable values in a column
Useful for columns that mostly contain valid data but occasionally
contain other values such as 'unknown'
Args:
df (DataFrame): The dataframe to clean
colname (string): The name of the column to clean
values_to_null: A list of values to be nulled.
Returns:
DataFrame: The cleaned dataframe with incoming column overwritten
"""

if len(values_to_null) == 0:
return df

values_to_null_string = [f'"{v}"' for v in values_to_null]
values_to_null_joined = ", ".join(values_to_null_string)

case_statement = f"""
CASE
WHEN {colname} in ({values_to_null_joined}) THEN NULL
ELSE {colname}
END
"""

df = df.withColumn(colname, f.expr(case_statement))

return df
67 changes: 67 additions & 0 deletions tests/test_remove_anomalies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import pytest
import pandas as pd

from splink_data_standardisation.remove_anomalies import null_out_values
from pyspark.sql import Row


def test_null_out_vals_0(spark):

data_list = [
{"id": 1, "mycol": "A"},
{"id": 2, "mycol": "B"},
{"id": 3, "mycol": "B"},
{"id": 4, "mycol": "C"},
{"id": 5, "mycol": "C"},
]

garbagevals = []

df = spark.createDataFrame(Row(**x) for x in data_list)

df = null_out_values(df, "mycol", garbagevals)

df_result = df.toPandas()

df_expected = [
{"id": 1, "mycol": "A"},
{"id": 2, "mycol": "B"},
{"id": 3, "mycol": "B"},
{"id": 4, "mycol": "C"},
{"id": 5, "mycol": "C"},
]

df_expected = pd.DataFrame(df_expected)

pd.testing.assert_frame_equal(df_result, df_expected)


def test_null_out_vals_1(spark):

data_list = [
{"id": 1, "mycol": "A"},
{"id": 2, "mycol": "B"},
{"id": 3, "mycol": "B"},
{"id": 4, "mycol": "C"},
{"id": 5, "mycol": "C"},
]

garbagevals = ["C"]

df = spark.createDataFrame(Row(**x) for x in data_list)

df = null_out_values(df, "mycol", garbagevals)

df_result = df.toPandas()

df_expected = [
{"id": 1, "mycol": "A"},
{"id": 2, "mycol": "B"},
{"id": 3, "mycol": "B"},
{"id": 4, "mycol": None},
{"id": 5, "mycol": None},
]

df_expected = pd.DataFrame(df_expected)

pd.testing.assert_frame_equal(df_result, df_expected)

0 comments on commit 355b817

Please sign in to comment.