In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [49]:
df = spark.createDataFrame([{'id': 1, 'value': 1,'amount':2}, {'id': 2, 'value': 2,'amount':3}])

In [74]:
from pyspark.sql import DataFrame
from typing import List
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.column import Column
from pyspark.sql import Window
import datetime
import decimal
from typing import List, Dict, Union, Any

class Orca(object):
    _type_mappings = {
    type(None): NullType,
    bool: BooleanType,
    int: LongType,
    float: DoubleType,
    str: StringType,
    bytearray: BinaryType,
    decimal.Decimal: DecimalType,
    datetime.date: DateType,
    datetime.datetime: TimestampType,
    datetime.time: TimestampType,
    bytes: BinaryType,}
    def __init__(self,df):
        self.df = df
    def addColumnSuffix(
    self, suffix: str = "suffix", colsList: List = []
) -> DataFrame:
        if not colsList:
            colsList = self.df.columns

        def _keyExists(s):
            return s in colsList

        cols = list(
            map(
                lambda col_name: col(col_name).alias("{0}_{1}".format(col_name, suffix))
                if _keyExists(col_name)
                else F.col(col_name),
                self.df.columns,
            )
        )
        return Orca(self.df.select(*cols))
    def returnDF(self):
        return self.df
    def addColumnPrefix(
    self, prefix: str , colsList: List 
) -> DataFrame:
    
        if not colsList:
            colsList = self.df.columns

        def _keyExists(s):
            return s in colsList

        cols = list(
            map(
                lambda col_name: F.col(col_name).alias("{0}_{1}".format(prefix, col_name))
                if _keyExists(col_name)
                else F.col(col_name),
                self.df.columns,
            )
        )
        return Orca(self.df.select(*cols))
    def dedupsByRank(
    self,
    partitionCols: List,
    orderCols: Union[List, Dict[str, List]],
    rankType:Column = row_number(),
) -> DataFrame:
        orderCols = (
            {"columns": list(orderCols), "asc": [], "desc": []}
            if isinstance(orderCols, list)
            else orderCols
        )
        _ = (
            lambda col_name: col(col_name).asc()
            if col_name in orderCols["asc"]
            else (col(col_name).desc() if col_name in orderCols["desc"] else col(col_name))
        )
        windowBy = Window.partitionBy(
            *list(map(lambda col_name: col(col_name), partitionCols))
        ).orderBy(*list(map(lambda col_name: _(col_name), orderCols['columns'])))
        return Orca(
            self.df.withColumn("rank", rankType.over(windowBy))
            .filter(col("rank") == 1)
            .drop("rank")
        )
    def selectByDtype(self, dtypes: Any) -> DataFrame:
        if isinstance(dtypes, list):
            for val in range(len(dtypes)):
                if dtypes[val] in self._type_mappings.keys():
                    dtypes[val] = self._type_mappings[dtypes[val]]
            dtypes = tuple(dtypes)
        elif dtypes in self._type_mappings.keys():
            dtypes = self._type_mappings[dtypes]
        cols = list(
            filter(
                None.__ne__,
                list(
                    map(
                        lambda field: field.name
                        if isinstance(field.dataType, dtypes)
                        else None,
                        self.df.schema,
                    )
                ),
            )
        )
        return Orca(self.df.select(*cols))

In [83]:
df = spark.read.parquet(r'C:\work\python-packages\orca\data\membership.parquet')

In [86]:
print('Count: ',df.count())
print("Schema:")
df.printSchema()

Count:  20000
Schema:
root
 |-- uniqueID: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- recID: string (nullable = true)
 |-- cost: double (nullable = true)
 |-- dates: string (nullable = true)



In [87]:
# Convert spark dataframe into Orca
od = Orca(df)

In [93]:
orderCols = {'columns' : ['dates','cost'],'asc' : ['dates'],'desc' : ['cost']}

# Can apply all the available Orca functions
odf = od.dedupsByRank(['uniqueID'],orderCols)\ # apply deduplication by rank
       .selectByDtype(str).returnDF() # Select columns of only string type

In [95]:
print('Count: ',odf.count())
print("Schema:")
odf.printSchema()

Count:  1000
Schema:
root
 |-- Name: string (nullable = true)
 |-- recID: string (nullable = true)
 |-- dates: string (nullable = true)



In [77]:
df.count()

1000

In [78]:
df.show()

+-----------------+---------+-------------------+
|             Name|    recID|              dates|
+-----------------+---------+-------------------+
|     Bruno Sparks|ID5342509|2000-10-19 10:00:27|
|    Markus Kelley|ID9563493|2003-06-04 01:08:37|
|    Isidro Reeves|ID2380131|2001-06-17 07:19:24|
|    Kurtis Garcia|ID3464937|2000-04-21 15:01:52|
|  Lavonia Summers|ID1344139|2000-05-10 04:12:18|
|    Arlie Bernard|ID3411614|2001-07-19 13:01:53|
|      Lizeth Lara|ID6967628|2000-04-21 23:53:27|
|  Isaiah Mcintyre|ID3409715|2005-07-09 01:59:49|
|     Roni Ramirez|ID4970503|2000-11-29 03:52:38|
|       Leon Burch|ID9247350|2001-07-30 12:25:34|
| Margaretta Kelly|ID7673732|2000-09-19 17:03:40|
|Letisha Blackburn|ID2437465|2001-02-26 04:44:24|
|  Somer Patterson|ID0556149|2000-08-27 17:40:15|
|       Damon Pace|ID3268057|2001-02-22 00:31:44|
|     Jude Camacho|ID5081762|2001-04-20 05:37:06|
|    Cythia Reeves|ID9220392|2000-04-19 17:12:37|
|     Halina Hardy|ID5092522|2000-04-15 21:11:47|


In [79]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- recID: string (nullable = true)
 |-- dates: string (nullable = true)



In [48]:
print(df._jdf.showString(20, 20, True))

-RECORD 0-------------------
 prefix_amount_suffix | 2   
 prefix_id_suffix     | 1   
 prefix_value_suffix  | 1   
-RECORD 1-------------------
 prefix_amount_suffix | 3   
 prefix_id_suffix     | 2   
 prefix_value_suffix  | 2   

