In [None]:
import logging

from pyspark import SparkContext, SQLContext, SparkConf
from typing import Optional


SPARK_CONTEXT: Optional[SparkContext] = None
SQL_CONTEXT: Optional[SQLContext] = None

DATA_PATH = "/mnt"


def config_pyspark_submit_args():
    sc_conf = SparkConf()

    # Amount of memory to use for the driver process, i.e. where SparkContext is initialized.
    sc_conf.set("spark.driver.memory", "100g")

    # Limit of total size of serialized results of all partitions for each Spark action (e.g. collect).
    # Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total size is above this limit.
    sc_conf.set("spark.driver.maxResultSize", "0")

    # Fraction of Java heap to use for aggregation and cogroups during shuffles,
    # if spark.shuffle.spill is true. At any given time, the collective size of all in-memory maps used for shuffles
    # is bounded by this limit, beyond which the contents will begin to spill to disk.
    # If spills are often, consider increasing this value at the expense of spark.storage.memoryFraction.
    sc_conf.set("spark.shuffle.memoryFraction", "0.3")

    # Amount of memory to use per python worker process during aggregation,
    # in the same format as JVM memory strings (e.g. 512m, 2g).
    # If the memory used during aggregation goes above this amount, it will spill the data into disks
    sc_conf.set("spark.python.worker.memory", "8g")

    # By default, Spark serializes objects using Java’s ObjectOutputStream framework,
    # and can work with any class you create that implements java.io.Serializable.
    # You can also control the performance of your serialization more closely by extending java.io.Externalizable.
    # Java serialization is flexible but often quite slow, and leads to large serialized formats for many classes.
    # Kryo serialization: Spark can also use the Kryo library (version 4) to serialize objects more quickly.
    # Kryo is significantly faster and more compact than Java serialization (often as much as 10x),
    # but does not support all Serializable types and requires you to register the classes you’ll use in the program
    # in advance for best performance.
    sc_conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    sc_conf.set("spark.kryoserializer.buffer.max", "2000")

    # expresses the size of M as a fraction of the (JVM heap space - 300MB) (default 0.6).
    # The rest of the space (40%) is reserved for user data structures, internal metadata in Spark,
    # and safeguarding against OOM errors in the case of sparse and unusually large records.
    # For more info: https://spark.apache.org/docs/latest/tuning.html
    # another definition:
    # Fraction of (heap space - 300MB) used for execution and storage.
    # The lower this is, the more frequently spills and cached data eviction occur.
    # The purpose of this config is to set aside memory for internal metadata, user data structures,
    # and imprecise size estimation in the case of sparse, unusually large records.
    # Leaving this at the default value is recommended.
    sc_conf.set("spark.memory.fraction", "0.4")

    # Amount of storage memory immune to eviction, expressed as a fraction of the size of the region set aside by
    # spark.memory.fraction. The higher this is, the less working memory may be available to execution
    # and tasks may spill to disk more often. Leaving this at the default value is recommended.
    sc_conf.set("spark.memory.storageFraction", "0.5")

    # some configs for default dirs
    sc_conf.set("spark.local.dir", f"{DATA_PATH}/spark_tmp")
    sc_conf.set("spark.executor.extraJavaOptions", f"-Djava.io.tmpdir={DATA_PATH}/spark_tmp")
    sc_conf.set("spark.driver.extraJavaOptions", f"-Djava.io.tmpdir={DATA_PATH}/spark_tmp")

    # Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer
    # data between JVM and Python processes. This currently is most beneficial to Python users
    # that work with Pandas/NumPy data.
    sc_conf.set("spark.sql.execution.arrow.enabled", "true")

    # Configures the number of partitions that are used when shuffling data for joins or aggregations.
    # Recommended - 2-3x of virtual machine cores, but < 128Mb of data for each partition
    sc_conf.set("spark.sql.shuffle.partitions", "100")

    return sc_conf


def setup_context():
    global SPARK_CONTEXT
    global SQL_CONTEXT

    config = config_pyspark_submit_args()

    SPARK_CONTEXT = SparkContext(conf=config)
    SQL_CONTEXT = SQLContext(SPARK_CONTEXT)

    logging.getLogger('py4j').setLevel(logging.ERROR)

    SPARK_CONTEXT.setLogLevel("ERROR")

    # Checkpointing is actually a feature of Spark Core(that SparkSQL uses for distributed computations)
    # that allows a driver to be restarted on failure
    # with previously computed state of a distributed computation described as an RDD
    # Checkpointing truncates the lineage of a RDD to be checkpointed.
    # That has been successfully used in Spark MLlib in iterative machine learning algorithms like ALS
    # https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-checkpointing.html

    checkpoint_path = f"{DATA_PATH}/checkpoint/"

    SPARK_CONTEXT.setCheckpointDir(checkpoint_path)



def get_sql_context() -> SQLContext:
    if not SQL_CONTEXT:
        setup_context()
    return SQL_CONTEXT

    
def stop_context():
    SPARK_CONTEXT.stop()


In [None]:
import os
from datetime import datetime
from functools import reduce
from io import BytesIO
from typing import List, Union, Optional
import logging
import pyspark.sql.functions as SF
from pyspark.sql import DataFrame, DataFrameWriter
from pyspark.sql.types import StringType



def cast_column_type(df1: DataFrame, df2: DataFrame, column: str) -> DataFrame:
    """
    cast given column of df2 to the type of the column in df1
    :param df1: main dataframe with "good" data type
    :param df2: secondary dataframe which iw going to change his column type
    :param column: column to change
    :return: df2 with new column data type
    """

    # here we get needed column from df1 with type
    needed_column_attrs = list(filter(lambda x: x.name == column, list(df1.schema)))

    if len(needed_column_attrs) == 0:
        raise ValueError(f"Column  {column} for cast doesn't exists")

    needed_column_type = needed_column_attrs[0].dataType

    return df2.withColumn(column, df1[column].cast(needed_column_type))


def unify_dataframe_columns(main_df: DataFrame, second_df: DataFrame) -> DataFrame:
    """
    unifies all data types of all columns of second_df to the all data types of main_df
    :param main_df: main dataframe with "good" data types of columns
    :param second_df: secondary dataframe which iw going to change his columns types
    :return: second_df with new unified columns to the main_df
    """

    main_df_columns = list(main_df.schema)
    second_df_columns = list(main_df.schema)

    if set([c.name for c in main_df_columns]) != set([c.name for c in second_df_columns]):
        raise ValueError(f"Columns from given dataframes are not the same")

    # here we sort columns for the right order in next cycle
    main_df_columns = list(sorted(main_df_columns, key=lambda x: x.name))
    second_df_columns = list(sorted(second_df_columns, key=lambda x: x.name))

    for i in range(len(main_df_columns)):
        # if data types of columns don't match - do cast second dataframe column
        if main_df_columns[i].dataType != second_df_columns[i].dataType:
            logging.warning(f"data types of column {main_df_columns[i].name} doesn't match:"
                           f" {main_df_columns[i].dataType}, {second_df_columns[i].dataType}")
            second_df = cast_column_type(main_df, second_df, main_df_columns[i].name)

    return second_df


def concatenate(
        dfs: List[DataFrame],
        cast_types_to_first_element=False,
        columns: Optional[List[str]] = None
) -> DataFrame:
    # sanity check for Nones
    dfs = list(filter(lambda df: df is not None, dfs))

    if columns is None:
        columns = list(set(sum(map(lambda x: list(x.columns), dfs), [])))

    for i in range(len(dfs)):
        for column in columns:
            if column not in dfs[i].columns:
                dfs[i] = dfs[i].withColumn(column, SF.lit(None).cast(StringType()))

    if len(dfs) > 1 and cast_types_to_first_element:
        for i in range(1, len(dfs)):
            dfs[i] = unify_dataframe_columns(dfs[0], dfs[i])

    return reduce(DataFrame.unionByName, dfs)

def read_csv(path: str) -> DataFrame:
    return get_sql_context() \
        .read.format("com.databricks.spark.csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load(path)


def write_csv(df: DataFrame, path: str, single=False) -> None:
    if single:
        df = df.coalesce(1)

    df \
        .write \
        .mode("overwrite") \
        .option("header", "true") \
        .format("com.databricks.spark.csv") \
        .save(path, index="False")


def read_table(path: str, table_name: str) -> DataFrame:
    return get_sql_context() \
        .read \
        .option("path", path) \
        .table(table_name)


def write_table(
        df: DataFrame, path: str, table_name: str,
        buckets: int = 20, bucket_columns: Union[str, List[str]] = None, mode: str = "overwrite",
        sort_column: str = None
):
    """
    :param df: spark Dataframe object
    :param path:
    :param table_name:
    :param buckets: number of buckets
    :param bucket_columns: columns for bucketing
    :param mode: spark write mode
    :param sort_column: column for sorting

    bucketing use for saving partitioned parquet files and join optimization
    https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-bucketing.html
    """
    writer_obj: DataFrameWriter = df.write

    writer_obj = writer_obj.mode(mode)

    if bucket_columns is not None:
        writer_obj = writer_obj.bucketBy(buckets, bucket_columns)

    if sort_column is not None:
        writer_obj = writer_obj.sortBy(sort_column)

    writer_obj \
        .option("path", validate_cluster_path(path)) \
        .saveAsTable(table_name)


def read_parquet(path: str) -> DataFrame:
    return get_sql_context().read.parquet(path)


def write_parquet(df: DataFrame, path: str, mode="overwrite", single=False):
    if single:
        df = df.coalesce(1)
    
    df \
        .write \
        .mode(mode) \
        .parquet(path)

def check_path_exists(path: str, is_spark_dir: bool = False):
    path_exists = os.path.exists(path)

    if is_spark_dir:
        path_exists = path_exists and \
            check_path_exists(os.path.join(path, "_SUCCESS"), is_spark_dir=False)

    return path_exists

In [None]:
import pickle
import json
import numpy as np


def save_pickle(path, obj, *args, **kwargs):
    with open(path, "wb") as f:
        pickle.dump(obj, f, *args, **kwargs)

def load_pickle(path, required: bool = False):
    with open(path, "rb") as f:
        obj = pickle.load(f)

    return obj


class NpEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        else:
            return super(NpEncoder, self).default(obj)


def save_json(path, obj, use_np_encoder: bool = True):
    encoder_class = None
    if use_np_encoder:
        encoder_class = NpEncoder

    with open(path, "w") as f:
        json.dump(obj, f, cls=encoder_class)


def load_json(path, required: bool = False):
    with open(path, "r") as f:
        obj = json.load(f)

    return obj


In [1]:
import pandas as pd
from tqdm import tqdm_notebook

def read_pandas_from_spark(path, read_function = pd.read_csv, file_type="csv"):
    if os.path.isfile(path):
        return read_function(path)
    else:
        files = list(os.walk(path))[0][2]

        dataframes = []

        for file in tqdm_notebook(files):
            if not file.startswith(".") and file.endswith(f".{file_type}"):
                try:
                    file_path = os.path.join(path, file)
                    df = read_function(file_path)
                    dataframes.append(df)

                except:
                    print(f"can't read file - {file} from spark csv")
                    continue
        
        if len(dataframes) == 1:
            return dataframes[0]
        
        result = pd.concat(dataframes, copy=False).reset_index(drop=True)

        return result
    
def read_pandas_parquet_from_spark(path):
    return read_pandas_from_spark(path, read_function=pd.read_parquet, file_type="parquet")

In [None]:
from sklearn import preprocessing

In [None]:
def reduce_mem_usage(df, verbose=True):
    numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
    start_mem = df.memory_usage().sum() / 1024**2    
    for col in df.columns:
        col_type = df[col].dtypes
        if col_type in numerics: 
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)  
            else:
                if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)    
    end_mem = df.memory_usage().sum() / 1024**2
    if verbose: print('Mem. usage decreased to {:5.2f} Mb ({:.1f}% reduction)'.format(end_mem, 100 * (start_mem - end_mem) / start_mem))
    return df

In [None]:
import pandas as pd
def encode_categorical(df, cols):
    for col in cols:
        # Leave NaN as it is.
        le = preprocessing.LabelEncoder()
        not_null = df[col][df[col].notnull()]
        df[col] = pd.Series(le.fit_transform(not_null), index=not_null.index)

    return df