In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder.getOrCreate()

In [16]:
first = spark.createDataFrame([{'id': 1, 'value': None}, {'id': 2, 'value': 2}])
second = spark.createDataFrame([{'id': 1, 'value': 1}, {'id': 2, 'value': 22}])
third = spark.createDataFrame([{'id': 1, 'value': 10}, {'id': 2, 'value': 226}])

In [21]:
from collections import defaultdict

coalesces=['value']
coalesces = set(coalesces or [])
renamed_columns = defaultdict(list)

In [22]:
dfs = [first,second,third]

In [23]:
for idx,df in enumerate(dfs):
    for col in df.columns:
        if col in coalesces:
            disambiguation = "__{}_{}".format(idx, col)
            df = df.withColumnRenamed(col, disambiguation)
            renamed_columns[col].append(disambiguation)
    dfs[idx] = df

In [24]:
dfs

[DataFrame[id: bigint, __0_value: bigint],
 DataFrame[id: bigint, __1_value: bigint],
 DataFrame[id: bigint, __2_value: bigint]]

In [25]:
from functools import reduce
joined_df = reduce(lambda x, y: x.join(y, on='id', how='inner'), dfs)

In [26]:
joined_df.show()

+---+---------+---------+---------+
| id|__0_value|__1_value|__2_value|
+---+---------+---------+---------+
|  1|     null|        1|       10|
|  2|        2|       22|      226|
+---+---------+---------+---------+



In [14]:

for col, disambiguations in renamed_columns.items():
    joined_df = joined_df.withColumn(col, coalesce(*disambiguations))
    for disambiguation in disambiguations:
        joined_df = joined_df.drop(disambiguation)

In [15]:
joined_df.show()

+---+-----+
| id|value|
+---+-----+
|  1|    1|
|  2|    2|
+---+-----+



In [27]:
result = reduce((lambda a, b: a + b), [1, 2, 3, 4])
print(result)

10


In [30]:
first.join(second,["id"],how="inner").join(third,["id"],how="inner").show()

+---+-----+-----+-----+
| id|value|value|value|
+---+-----+-----+-----+
|  1| null|    1|   10|
|  2|    2|   22|  226|
+---+-----+-----+-----+



In [33]:
from pyspark.sql import DataFrame
def performJoin(left:DataFrame,
               right:DataFrame,
               condition:str=None,
               joinType:str=None) -> DataFrame:
    return left.join(right,condition,joinType)

In [43]:
performJoin(first,second,"id","leftouter").show()

+---+-----+-----+
| id|value|value|
+---+-----+-----+
|  1| null|    1|
|  2|    2|   22|
+---+-----+-----+



In [38]:
dfs = [first,second]
conditions = ["id"]
joinType = []

In [47]:
joins =  {
    'dataFrames' : [first,second],
    'conditions' : ["id"],
    'joins'   : ["inner"]
}

In [48]:
joins

{'dataFrames': [DataFrame[id: bigint, value: bigint],
  DataFrame[id: bigint, value: bigint]],
 'conditions': ['id'],
 'joins': ['inner']}

In [49]:
performJoin(*joins['dataFrames'],"id","leftouter").show()

+---+-----+-----+
| id|value|value|
+---+-----+-----+
|  1| null|    1|
|  2|    2|   22|
+---+-----+-----+



In [51]:
zip([first,second],["id"])

<zip at 0x27caee4cb00>

In [53]:
for x,y in zip([first,second],["id","d"]):
    print(x,y)

DataFrame[id: bigint, value: bigint] id
DataFrame[id: bigint, value: bigint] d


In [57]:
joined_df = reduce(lambda x, y: x.join(y, on='id', how='inner'),dfs)

In [58]:
joined_df.show()

+---+---------+---------+---------+
| id|__0_value|__1_value|__2_value|
+---+---------+---------+---------+
|  1|     null|        1|       10|
|  2|        2|       22|      226|
+---+---------+---------+---------+



In [59]:
def add(x,y):
    return x+y

In [69]:
it = iter(dfs)

In [71]:
value = next(it)

StopIteration: 

In [70]:
for df in it:
    print(df)

DataFrame[id: bigint, __0_value: bigint]
DataFrame[id: bigint, __1_value: bigint]
DataFrame[id: bigint, __2_value: bigint]


In [65]:
def reduce(function, iterable, initializer=None):
    it = iter(iterable)
    if initializer is None:
        value = next(it)
    else:
        value = initializer
    for element in it:
        value = function(value, element)
    return value

In [105]:
from typing import List
from itertools import cycle

def reduceJoins(dfs:List[DataFrame],
                conditions:List[str]=[],
                joinTypes:List[str]=[]):
    it        = iter(dfs)
    licycle = cycle(conditions)
    value = next(it)
    for element in it:
        nextelem = next(licycle)
        print(element,nextelem)
        

In [106]:
reduceJoins([first,second,third],["id","x"],["inner"])

DataFrame[id: bigint, value: bigint] id
DataFrame[id: bigint, value: bigint] x


In [11]:
from pyspark.sql import DataFrame
from typing import List
from itertools import cycle

def performJoin(left:DataFrame,
               right:DataFrame,
               condition=None,
               joinType:str=None) -> DataFrame:
    return left.join(right,condition,joinType)

def reduceJoins(dfs:List[DataFrame],
               conditionsList:List=[],
               joinsList:List[str]=["inner"]) -> DataFrame :
    frames      =  iter(dfs)
    conditions  =  cycle(conditionsList)
    joins       =  cycle(joinsList)
    value       =  next(frames)
    for element in frames:
        try:
            nextCondition = next(conditions)
        except StopIteration as e:
            nextCondition = None
        #nextJoin      = lambda x : None if next(joins) is None else next(joins)
        value = performJoin(value,element,nextCondition,next(joins))
    return value

In [13]:
reduceJoins([first,second.hint("broadcast"),third],[first.first_id==second.second_id,
                                 second.second_id==third.third_id]).show()

+--------+-----+---------+-----+--------+-----+
|first_id|value|second_id|value|third_id|value|
+--------+-----+---------+-----+--------+-----+
|       1| null|        1|    1|       1|   10|
|       2|    2|        2|   22|       2|  226|
+--------+-----+---------+-----+--------+-----+



In [3]:
first = spark.createDataFrame([{'first_id': 1, 'value': None}, {'first_id': 2, 'value': 2}])
second = spark.createDataFrame([{'second_id': 1, 'value': 1}, {'second_id': 2, 'value': 22}])
third = spark.createDataFrame([{'third_id': 1, 'value': 10}, {'third_id': 2, 'value': 226}])



ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.