In [1]:
import pyspark
import pyspark.sql.functions as F

spark = pyspark.sql.SparkSession.builder.master('local').getOrCreate()

In [2]:
# def config_spark():
#     """
#     Configure Spark
#     """
#     import pyspark
#     import pyspark.sql.functions as F
#     import pyspark.sql.types as T
#     spark = pyspark.sql.SparkSession.builder.master('local').getOrCreate()
    

def to_pandas(sparkDf, n=10):
    """
    Returns a Pandas dataframe
    
    Parameters
    ----------
    sparkDf: pyspark.sql.dataframe.DataFrame
        One Spark dataframe
    n: int / float
        Top n rows
    
    Returns
    -------
    A Pandas dataframe
    """
    pdDf = sparkDf.limit(n).toPandas()
    return pdDf


def group_count_percent(sparkDf, cols, n=10, dfType='pandas'):
    """
    Returns a Pandas dataframe group by column(s), sort in descending order, calculate count and percent
    
    Parameters
    ----------
    sparkDf: pyspark.sql.dataframe.DataFrame
        One Spark dataframe
    cols: str / list
        Accepts a (list of) string(s) of column(s)
    n: int / float
        Top n rows
        
    Returns
    -------
    A Pandas dataframe
    """
    df = sparkDf.groupBy(cols).count().orderBy('count', ascending=False)
    rowCount = sparkDf.count()
    
    if n == float('inf'):
        df = df.withColumn('percent', F.round(F.udf(lambda x: x*100/rowCount)('count'), 3))
    else:
        df = df.withColumn('percent', F.round(F.udf(lambda x: x*100/rowCount)('count'), 3)).limit(n)
    
    if dfType == 'pandas':
        pdDf = df.toPandas()
        return pdDf
    
    if dfType == 'spark':
        return df


def info(sparkDf):
    """
    Display Spark dataframe information
    
    Parameters
    ----------
    sparkDf: pyspark.sql.dataframe.DataFrame
    """
    print('The dataframe: {0}'.format(type(sparkDf)))
    print('Number of columns: {0}'.format(len(sparkDf.columns)))
    print('Number of rows: {0}'.format(sparkDf.count()))
    sparkDf.printSchema()


def rename_columns(sparkDf, cols):
    """
    Rename Spark dataframe column(s)
    
    Parameters
    ----------
    sparkDf: pyspark.sql.dataframe.DataFrame
        One Spark dataframe
    cols: dict
        A dictionary {oldName: newName} of columns to rename
    
    Returns
    -------
    A Spark dataframe
    """
    df = sparkDf.select([F.col(c).alias(cols.get(c,c)) for c in sparkDf.columns])
    return df


def columns_statistics(sparkDf, n=10):
    """
    Display Spark dataframe columns' statistics and return 2 lists

    Parameters
    ----------
    sparkDf: pyspark.sql.dataframe.DataFrame
        One Spark dataframe
    n: int / float
        Top n rows
    
    Returns
    -------
    Lists of null-value and single-value columns. null-value list <= single-value list
    """
    info(sparkDf)
    nullValueCols, singleValueCols = [], []
    
    for column in sparkDf.columns:
        df = group_count_percent(sparkDf=sparkDf, cols=column, n=n, dfType='Spark')
        print(column)
        df.show(n=n)
        
        if df.count() == 1:
            singleValueCols.append(column)
            print('!!!!! {0} is a candidate to drop !!!!!\n\n'.format(column))
        
            if not df.first()[0] or df.first()[0].casefold() == 'none' or df.first()[0].casefold():
                nullValueCols.append(column)
        
    print('There are {0} of single value columns, they are: {1}'.format(len(singleValueCols), singleValueCols))
    print('There are {0} of null value columns, they are: {1}'.format(len(nullValueCols), nullValueCols))
    return nullValueCols, singleValueCols


def column_into_list(sparkDf, singleCol):
    """
    Convert a Spark dataframe's column into list
    
    Parameters
    ----------
    sparkDf: pyspark.sql.dataframe.DataFrame
        One Spark dataframe
    singleCol: str
        One column in sparkDf
    
    Returns
    -------
    A list
    """
    if col in sparkDf.columns:
        LIST = sparkDf.select(singleCol).toPandas()[col].values.tolist()
        return LIST


def column_into_set(sparkDf, singleCol):
    """
    Convert a Spark dataframe's column into set
    
    Parameters
    ----------
    sparkDf: pyspark.sql.dataframe.DataFrame
        One Spark dataframe
    singleCol: str
        One column in sparkDf
    
    Returns
    -------
    A set
    """
    SET = set(column_into_list(sparkDf, singleCol))
    return SET


def prefix_to_columns(sparkDf, prefix):
    """
    Add prefix Spark dataframe's columns
    
    Parameters
    ----------
    sparkDf: pyspark.sql.dataframe.DataFrame
        One Spark dataframe
    prefix: str
        Prefix
        
    Returns
    -------
    A Spark dataframe
    """
    df = sparkDf
    for column in sparkDf.columns:
        if not column.startswith(prefix):
            df = df.withColumnRenamed(column, prefix+column)
    
    return df


def add_dummy_columns(sparkDf, cols, value):
    """
    Add dummy column(s) to Spark dataframe
    
    Parameters
    ----------
    sparkDf: pyspark.sql.dataframe.DataFrame
        One Spark dataframe
    cols: list
        List of columns
    value: str
        Default value of the new column(s)
    
    Returns
    -------
    A Spark dataframe
    """
    df = sparkDf
    dummyCols = set(cols) - set(sparkDf.columns)
    for column in dummyCols:
        df = df.withColumn(column, F.lit(value))
    
    return df

In [3]:
import pandas as pd

df_pd = pd.DataFrame(data=
                     {'integers': [1, 2, 3], 
                      'floats': [-1.0, 0.5, 2.7], 
                      'integer_array': [[1, 2], [3, 4, 5], [6, 7, 8, 9]], 
                      'str_array': [[], ['a'], ['a','b']], 
                      'literal_str_array': "[[], ['a'], ['a','b']]", 
                      'literal_str_array2': '"[[], [a], [a,b]]"', 
                      'strs': ['null', '', None]
                     })

df = spark.createDataFrame(df_pd)
to_pandas(df)

Unnamed: 0,integers,floats,integer_array,str_array,literal_str_array,literal_str_array2,strs
0,1,-1.0,"[1, 2]",[],"[[], ['a'], ['a','b']]","""[[], [a], [a,b]]""",
1,2,0.5,"[3, 4, 5]",[a],"[[], ['a'], ['a','b']]","""[[], [a], [a,b]]""",
2,3,2.7,"[6, 7, 8, 9]","[a, b]","[[], ['a'], ['a','b']]","""[[], [a], [a,b]]""",


In [4]:
colsToRename = {'strs':'test', 'integers':'integer'}

to_pandas(rename_columns(df, colsToRename))

Unnamed: 0,integer,floats,integer_array,str_array,literal_str_array,literal_str_array2,test
0,1,-1.0,"[1, 2]",[],"[[], ['a'], ['a','b']]","""[[], [a], [a,b]]""",
1,2,0.5,"[3, 4, 5]",[a],"[[], ['a'], ['a','b']]","""[[], [a], [a,b]]""",
2,3,2.7,"[6, 7, 8, 9]","[a, b]","[[], ['a'], ['a','b']]","""[[], [a], [a,b]]""",


In [5]:
?group_count_percent

In [6]:
x = None
x = 'None'
x = 'null'
x = 'Null'

not x

False

In [7]:
x = None
x = 'None'
x = 'null'
x = 'Null'

'none' in x.casefold()
'null' in x.casefold()

True

In [8]:
t0 = ['a','b','c']
t1 = ['c','e','f']

set(t0) - set(t1)

{'a', 'b'}