In [1]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import pandas as pd

In [2]:
df = spark.createDataFrame(
[('11','1','1152397078','VVVVM',1,'3/5/2020',1,5),
('11','1','1152944770','VVVVV',1,'3/6/2020',2,5),
('11','1','1153856408','VVVVV',1,'3/15/2020',3,5),
('11','2','1155884040','MVVVV',1,'4/2/2020',4,5),
('11','2','1156854301','MMVVV',0,'4/17/2020',5,5),
('12','1','1156854302','VVVVM',1,'3/6/2020',1,3),
('12','1','1156854303','VVVVV',1,'3/7/2020',2,3),
('12','2','1156854304','MVVVV',1,'3/16/2020',3,3)
]
,["consumer_id","product_id","TRX_ID","pattern","loyal","trx_date","row_num","mx"])
df.show()

In [3]:
from pypsark.sql import functions as F
from pyspark.sql.window import Window

w = Window().partitionBy("consumer_id").orderBy('row_num')
lead=F.lead("loyal").over(w)
df.withColumn("Flag", F.when(((F.col("loyal")==1)\
                             &((lead==0)|(lead.isNull()))),F.lit(1))\
                       .otherwise(F.lit(0))).show()

In [4]:
list=[[1,23,90],
     [2,12,45],
     [2,38,80],
     [2,91,62],
     [1,11,21],
     [1,29,57],
     [1,13,68],
     [2,14,19]]

df=spark.createDataFrame(list,['id','column_b','column_a'])


df.select("id","column_a","column_b").show()

In [5]:
You could only use **`collect_list`** over **`only one`** **`window`** and then use *higher order function* **`aggregate`** to get your desired result **`(sum/sum).`**

    df.show() #sample dataframe
    #+---+--------+--------+
    #| id|column_a|column_b|
    #+---+--------+--------+
    #|  1|      90|      23|
    #|  2|      45|      12|
    #|  2|      80|      38|
    #|  2|      62|      91|
    #|  1|      21|      11|
    #|  1|      57|      29|
    #|  1|      68|      13|
    #|  2|      19|      14|
    #+---+--------+--------+
    
    from pyspark.sql import functions as F
    from pyspark.sql.window import Window
    
    
    window=Window().partitionBy("id")
    df.withColumn("column_c",F.collect_list(F.array("column_a","column_b")).over(window))\
      .withColumn("column_c", F.expr("""aggregate(column_c,0,(acc,x)-> int(x[0])+acc)/\
                                   aggregate(column_c,0,(acc,x)-> int(x[1])+acc)""")).show()

    #+---+--------+--------+------------------+
    #| id|column_b|column_a|          column_c|
    #+---+--------+--------+------------------+
    #|  1|      23|      90|3.1052631578947367|
    #|  1|      11|      21|3.1052631578947367|
    #|  1|      29|      57|3.1052631578947367|
    #|  1|      13|      68|3.1052631578947367|
    #|  2|      12|      45|1.3290322580645162|
    #|  2|      38|      80|1.3290322580645162|
    #|  2|      91|      62|1.3290322580645162|
    #|  2|      14|      19|1.3290322580645162|
    #+---+--------+--------+------------------+

**`Instead of:`**(2 windows)

    window=Window().partitionBy("id")
    df.withColumn("colum_c",F.sum(F.col("column_a")).over(window)\
                                  /F.sum(F.col("column_b")).over(window)).show()


In [6]:
from pyspark.sql import functions as f

  
list=[[1,23,90,'2019-2-23'],
     [1,12,45,'2019-2-28'],
     [1,38,80,'2019-3-21'],
     [1,91,62,'2019-3-24'],
     [2,11,21,'2019-3-29'],
     [2,29,57,'2019-1-08'],
     [2,13,68,'2019-1-12'],
     [2,14,19,'2019-1-14']]

df=spark.createDataFrame(list,['account_id','column_b','column_a','event_date'])


df=df.select("account_id","column_a","column_b","event_date")

In [7]:
df.show() #sample data

#+----------+--------+--------+----------+
#|account_id|column_a|column_b|event_date|
#+----------+--------+--------+----------+
#|         1|      90|      23| 2019-2-23|
#|         1|      45|      12| 2019-2-28|
#|         1|      80|      38| 2019-3-21|
#|         1|      62|      91| 2019-3-24|
#|         2|      21|      11| 2019-3-29|
#|         2|      57|      29| 2019-1-08|
#|         2|      68|      13| 2019-1-12|
#|         2|      19|      14| 2019-1-14|
#+----------+--------+--------+----------+

from pyspark.sql import functions as F
from pyspark.sql.window import Window

days = lambda i: (i-1) * 86400

window =\
    Window()\
    .partitionBy(f.col("account_id"))\
    .orderBy(f.col("event_date").cast("timestamp").cast("long"))\
    .rangeBetween(-days(7), 0)

df.withColumn("column_c",F.collect_list(F.array("column_a","column_b")).over(window))\
  .withColumn("column_c", F.expr("""aggregate(column_c,0,(acc,x)-> int(x[0])+acc)/\
                               aggregate(column_c,0,(acc,x)-> int(x[1])+acc)""")).show()

#+----------+--------+--------+----------+------------------+
#|account_id|column_b|column_a|event_date|          column_c|
#+----------+--------+--------+----------+------------------+
#|         1|      23|      90| 2019-2-23|3.9130434782608696|
#|         1|      12|      45| 2019-2-28| 3.857142857142857|
#|         1|      38|      80| 2019-3-21|2.1052631578947367|
#|         1|      91|      62| 2019-3-24|1.1007751937984496|
#|         2|      29|      57| 2019-1-08|1.9655172413793103|
#|         2|      13|      68| 2019-1-12|2.9761904761904763|
#|         2|      14|      19| 2019-1-14|2.5714285714285716|
#|         2|      11|      21| 2019-3-29|1.9090909090909092|
#+----------+--------+--------+----------+------------------+

In [8]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window

days = lambda i: i * 86400

window =\
    Window()\
    .partitionBy(f.col("account_id"))\
    .orderBy(f.col("event_date").cast("timestamp").cast("long"))\
    .rangeBetween(-days(7), 0)

df.withColumn("column_c",f.collect_list(f.array("column_a","column_b")).over(window))\
  .withColumn("column_c", f.expr("""aggregate(column_c,0,(acc,x)-> int(x[0])+acc)/\
                               aggregate(column_c,0,(acc,x)-> int(x[1])+acc)""")).show()

In [9]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window

days = lambda i: i * 86400

window =\
    Window()\
    .partitionBy(f.col("account_id"))\
    .orderBy(f.col("event_date").cast("timestamp").cast("long"))\
    .rangeBetween(-days(7), 0)

df.withColumn("column_c",f.collect_list(f.struct\
                                        (f.col("column_a").alias("a"),\
                                         f.col("column_b").alias("b")))\
                                                          .over(window))\
  .withColumn("column_c", f.expr("""aggregate(column_c,0,(acc,x)-> int(x.a)+acc)/\
                               aggregate(column_c,0,(acc,x)-> int(x.b)+acc)""")).explain()

In [10]:
days = lambda i: i * 86400

window =\
    Window()\
    .partitionBy(f.col("account_id"))\
    .orderBy(f.col("event_date").cast("timestamp").cast("long"))\
    .rangeBetween(-days(7), 0)

df.withColumn("colum_c",F.sum(F.col("column_a")).over(window)\
                              /F.sum(F.col("column_b")).over(window)).explain()


In [11]:
== Physical Plan ==
Window [id#16L, column_b#17L, column_a#18L, (cast(sum(column_a#18L) windowspecdefinition(id#16L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) as double) / cast(sum(column_b#17L) windowspecdefinition(id#16L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) as double)) AS colum_c#2635], [id#16L]
+- Sort [id#16L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#16L, 200), [id=#722]
      +- *(1) Scan ExistingRDD[id#16L,column_b#17L,column_a#18L]

In [13]:
df.withColumn("colum_c",F.sum(F.col("column_a")).over(window)\
                              /F.sum(F.col("column_b")).over(window)).explain()

In [14]:
== Physical Plan ==
Window [account_id#4848L, column_b#4849L, column_a#4850L, event_date#4851, (cast(sum(column_a#4850L) windowspecdefinition(account_id#4848L, _w0#6818L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -604800, currentrow$())) as double) / cast(sum(column_b#4849L) windowspecdefinition(account_id#4848L, _w0#6818L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -604800, currentrow$())) as double)) AS colum_c#6812], [account_id#4848L], [_w0#6818L ASC NULLS FIRST]
+- Sort [account_id#4848L ASC NULLS FIRST, _w0#6818L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(account_id#4848L, 200), [id=#1492]
      +- *(1) Project [account_id#4848L, column_b#4849L, column_a#4850L, event_date#4851, cast(cast(event_date#4851 as timestamp) as bigint) AS _w0#6818L]
         +- *(1) Scan ExistingRDD[account_id#4848L,column_b#4849L,column_a#4850L,event_date#4851]

In [15]:
window=Window().partitionBy("id")
df.withColumn("colum_c",F.sum(F.col("column_a")/F.col("column_b")).over(window)).show()

In [16]:
window=Window().partitionBy("id")
df.withColumn("colum_c",F.sum(F.col("column_a")).over(window)/F.sum(F.col("column_b")).over(window)).show()

In [17]:
from pyspark.sql import functions as F

In [18]:
list1=[['abcde00000qMQ00001',['casa', 'alejado', 'buen', 'gusto']],
       ['abcde00000qMq00002',['clientes', 'contentos', 'servi']],
       ['abcde00000qMQ00003',['resto', 'bien']               ]]

df1=spark.createDataFrame(list1,['ID','MeaningfulWords'])

df1.show()

In [19]:


list2=[[ 1.68,     'casa'],
       [  2.8,  'alejado'],
       [ 1.03,     'buen'],
       [ 3.68,    'gusto'],
       [ 0.68, 'clientes'],
       [  2.1,'contentos'],
       [ 2.68,    'servi'],
       [ 1.18,    'resto'],
       [ 1.98,    'bien']]

df2=spark.createDataFrame(list2,['score','word'])

df2.show()

In [20]:
from pyspark.sql import functions as F
df1.join(df2, F.expr("""array_contains(MeaningfulWords,word)"""))\
   .groupBy("ID").agg(F.first("MeaningfulWords").alias("MeaningfullWords")\
                      ,F.collect_list("score").alias("ScoreList")\
                      ,F.mean("score").alias("MeanScore"))\
                      .show(truncate=False)

In [21]:
from pyspark.sql import functions as F
df1.join(df2, F.expr("""array_contains(MeaningfulWords,word)"""))\
   .groupBy("ID").agg(F.first("MeaningfulWords").alias("MeaningfullWords")\
                      ,F.collect_list("score").alias("ScoreList"))\
   .withColumn("MeanScore", F.expr("""aggregate((transform(ScoreList,x->double(x)))\
                                      ,0,(x,acc)-> acc+x,acc->(acc/ size(Scorelist)))""")).show(truncate=False)

In [22]:
df1.show()

#+------------------+----------------------------+
#|ID                |MeaningfulWords             |
#+------------------+----------------------------+
#|abcde00000qMQ00001|[casa, alejado, buen, gusto]|
#|abcde00000qMq00002|[clientes, contentos, servi]|
#|abcde00000qMQ00003|[resto, bien]               |
#+------------------+----------------------------+

df2.show()

#+-----+---------+
#|score|     word|
#+-----+---------+
#| 1.68|     casa|
#|  2.8|  alejado|
#| 1.03|     buen|
#| 3.68|    gusto|
#| 0.68| clientes|
#|  2.1|contentos|
#| 2.68|    servi|
#| 1.18|    resto|
#| 1.98|     bien|
#+-----+---------+


from pyspark.sql import functions as F
df1.join(df2, F.expr("""array_contains(MeaningfulWords,word)"""))\
   .groupBy("ID").agg(F.first("MeaningfulWords").alias("MeaningfullWords")\
                      ,F.collect_list("score").alias("ScoreList")\
                      ,F.mean("score").alias("MeanScore"))\
                      .show(truncate=False)

#+------------------+----------------------------+-----------------------+------------------+
#|ID                |MeaningfullWords            |ScoreList              |MeanScore         |
#+------------------+----------------------------+-----------------------+------------------+
#|abcde00000qMQ00003|[resto, bien]               |[1.18, 1.98]           |1.58              |
#|abcde00000qMq00002|[clientes, contentos, servi]|[0.68, 2.1, 2.68]      |1.8200000000000003|
#|abcde00000qMQ00001|[casa, alejado, buen, gusto]|[1.68, 2.8, 1.03, 3.68]|2.2975            |
#+------------------+----------------------------+-----------------------+------------------+

In [23]:
from pyspark.sql import functions as F
df1.join(df2, F.expr("""array_contains(MeaningfulWords,word)"""))\
   .groupBy("ID").agg(F.first("MeaningfulWords").alias("MeaningfullWords")\
                      ,F.collect_list("score").alias("ScoreList"))\
   .withColumn("MeanScore", F.expr("""aggregate((transform(ScoreList,x->int(x*100)))\
                                      ,0,(x,acc)-> acc+x,acc->(acc/100)/ size(Scorelist))""")).show(truncate=False)


In [24]:
list=[['A',          'yes'        ,1      ,1000],
      ['A',           'no'         ,2      ,100],
      ['A' ,          'no'         ,3      ,100],
      ['A'  ,         'no'         ,4      ,100],
      ['A'   ,        'no'         ,5      ,100],
      ['B'    ,       'yes'        ,1      ,2000],
      ['B'     ,     'no'         ,2      ,200],
      ['B'       ,    'no'         ,3      ,100],
      ['B'      ,     'no'         ,4      ,100],
      ['B'       ,    'no'         ,5      ,200],
      ['C'         ,  'yes'        ,1      ,3000],
      ['C'        ,   'no'         ,2      ,100],
      ['C'          , 'no'         ,3      ,100],
      ['C'           ,'no'         ,4      ,200],
      ['C'           ,'no'         ,5      ,200]]

df=spark.createDataFrame(list,['Group','Risk Group','State','Value'])

df.show()

In [25]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("Group")
df.withColumn("Risk Group Value", F.sum(F.when(F.col("Risk Group")=='yes',F.col("Value"))).over(w))\
  .filter(F.col("Risk Group")!='yes')\
  .orderBy("Group").show()

In [26]:
Group - Risk Group - State - Value - Risk Group Value
A           no         2      100         1000
A           no         3      100         1000
A           no         4      100         1000
A           no         5      100         1000 
B           no         2      200         2000
B           no         3      100         2000
B           no         4      100         2000
B           no         5      200         2000
C           no         2      100         3000
C           no         3      100         3000
C           no         4      200         3000
C           no         5      200         3000

In [27]:
from pyspark.sql import functions as F

list=[[['a', 'b','b','c']],
      [['b', 'c', 'd']]]


df=spark.createDataFrame(list,['atr_list'])

df.show()

In [28]:
df.show()

#+------------+
#|    atr_list|
#+------------+
#|[a, b, b, c]|
#|   [b, c, d]|
#+------------+

from pyspark.sql import functions as F

elements=['a','b','c','d']

collected=df.withColumn("a", F.when(F.array_contains("atr_list","a"),F.expr("""size(filter(atr_list,x->x='a'))""")).otherwise(F.lit(0)))\
  .withColumn("b", F.when(F.array_contains("atr_list","b"),F.expr("""size(filter(atr_list,x->x='b'))""")))\
  .withColumn("c", F.when(F.array_contains("atr_list","c"),F.expr("""size(filter(atr_list,x->x='c'))""")))\
  .withColumn("d", F.when(F.array_contains("atr_list","d"),F.expr("""size(filter(atr_list,x->x='d'))""")))\
  .agg(*[F.sum(x).alias(x) for x in elements])\
  .collect()[0]

{elements[i]: [x for x in collected][i] for i in range(len(elements))} 

Out[31]: {'a': 1, 'b': 3, 'c': 2, 'd': 1}

In [29]:
df.show()

#+------------+
#|    atr_list|
#+------------+
#|[a, b, b, c]|
#|   [b, c, d]|
#+------------+

elements=['a','b','c','d']
df.withColumn("struct", F.struct(*[(F.struct(F.expr("size(filter(atr_list,x->x={}))"\
                                                    .format("'"+y+"'"))).alias(y)) for y in elements]))\
  .select(*[F.sum(F.col("struct.{}.col1".format(x))).alias(x) for x in elements])\
  .collect()[0]

{elements[i]: [x for x in collected][i] for i in range(len(elements))} 
    
  

In [30]:
elements=['a','b','c','d']
df.withColumn("struct", F.struct(*[(F.struct(F.expr("size(filter(atr_list,x->x={}))"\
                                                    .format("'"+y+"'"))).alias(y)) for y in elements]))\
  .select(*[F.sum(F.col("struct.{}.col1".format(x))).alias(x) for x in elements])\
  .show()

In [31]:
["size(filter(atr_list,x->x={}))".format("'"+y+"'") for y in ['a','b','c','d']]

In [32]:
counter

In [33]:
['a','b','c','d'][1]

In [34]:
df.withColumn("atr_list",F.expr("""transform(array_distinct(atr_list), x-> struct(x,\
                                  size(filter(atr_list,y->y=x))))"""))\
   .show(truncate=False)

In [35]:
df.withColumn("atr_list",F.expr("""transform(array_distinct(atr_list), x-> map(x,\
                                  size(filter(atr_list,y->y=x))))"""))\
   .show(truncate=False)

In [36]:
df.agg(F.flatten(F.collect_list("atr_list")).alias("atr_list"))\
   .withColumn("atr_list",F.expr("""transform(array_distinct(atr_list), x-> map(x,\
                                  size(filter(atr_list,y->y=x))))""")).collect()[0][0]


In [37]:
df.withColumn("atr_list", F.explode("atr_list"))\
  .groupBy("atr_list").count().show()

In [38]:
df.withColumn("atr_list", F.expr("""transform(array_distinct(atr_list), x-> struct(x as item,\
                                  size(filter(atr_list,y->y=x)) as sum)))"""))\
  .select("atr_list.item","atr_list.sum")\
  .groupBy("item").agg(F.sum("sum").alias("SUM"))\
  .show()

In [39]:
df.withColumn("atr_list2", F.expr("""transform(atr_list, x-> map(x,\
                               (aggregate(atr_list,0,(y,acc)->IF(x=y,int(acc)+int(1),int(acc))\
                               ,acc->acc))))"""))\
  .show(truncate=False)

In [40]:
df.withColumn("atr_list2", F.expr("""aggregate()"""))

In [41]:
df1 = spark.createDataFrame(
     [
     ('ll',5),
     ('yy',6),
     ],
     ('x','days')
    )

df = spark.createDataFrame(
[
    ('ll','2020-01-05','1','10'),
    ('ll','2020-01-06','1','10'),
    ('ll','2020-01-07','1','10'),
    ('ll','2020-01-08','1','10'),
    ('ll','2020-01-09','1','10'),
    ('ll','2020-01-10','1','10'),
    ('ll','2020-01-11','1','10'),
    ('ll','2020-01-12','1','10'),
    ('ll','2020-01-05','2','30'),
    ('ll','2020-01-06','2','30'),
    ('ll','2020-01-07','2','30'),
    ('ll','2020-01-08','2','30'),
    ('ll','2020-01-09','2','30'),
    ('ll','2020-01-10','2','10'),
    ('ll','2020-01-11','2','10'),
    ('ll','2020-01-12','2','10'),
    ('yy','2020-01-05','1','20'),
    ('yy','2020-01-06','1','20'),
    ('yy','2020-01-07','1','20'),
    ('yy','2020-01-08','1','20'),
    ('yy','2020-01-09','1','20'),
    ('yy','2020-01-10','1','20'),
    ('yy','2020-01-11','1','20'),
    ('yy','2020-01-12','1','20'),
    ('yy','2020-01-05','2','40'),
    ('yy','2020-01-06','2','40'),
    ('yy','2020-01-07','2','40'),
    ('yy','2020-01-08','2','40'),
    ('yy','2020-01-09','2','40'),
    ('yy','2020-01-10','2','40'),
    ('yy','2020-01-11','2','40'),
    ('yy','2020-01-12','2','40')
     ],
    ('x','date','flag','value')
    )
df1.show()
df.show()

In [42]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("x","flag").orderBy(F.to_date("date","yyyy-dd-MM"))
w1=Window().partitionBy("x","flag")
df.join(df1, ['x'])\
  .withColumn("rowNum", F.row_number().over(w))\
  .withColumn("expected_result", F.sum(F.when(F.col("rowNum")>F.col("days")\
                                     ,F.lit(None)).otherwise(F.col("value")))\
                                      .over(w1)).drop("days","rowNum").show()

In [43]:
+---+----------+----+-----+---------------+
    |  x|      date|flag|value|expected_result|
    +---+----------+----+-----+---------------+
    | ll|2020-01-05|   1|   10|             50|
    | ll|2020-01-06|   1|   10|             50|
    | ll|2020-01-07|   1|   10|             50|
    | ll|2020-01-08|   1|   10|             50|
    | ll|2020-01-09|   1|   10|             50|
    | ll|2020-01-10|   1|   10|             50|
    | ll|2020-01-11|   1|   10|             50|
    | ll|2020-01-12|   1|   10|             50|
    | ll|2020-01-05|   2|   30|            150|
    | ll|2020-01-06|   2|   30|            150|
    | ll|2020-01-07|   2|   30|            150|
    | ll|2020-01-08|   2|   30|            150|
    | ll|2020-01-09|   2|   30|            150|
    | ll|2020-01-10|   2|   10|            150|
    | ll|2020-01-11|   2|   10|            150|
    | ll|2020-01-12|   2|   10|            150|
    | yy|2020-01-05|   1|   20|            120|
    | yy|2020-01-06|   1|   20|            120|
    | yy|2020-01-07|   1|   20|            120|
    | yy|2020-01-08|   1|   20|            120|
    | yy|2020-01-09|   1|   20|            120|
    | yy|2020-01-10|   1|   20|            120|
    | yy|2020-01-11|   1|   20|            120|
    | yy|2020-01-12|   1|   20|            120|
    | yy|2020-01-05|   2|   40|            240|
    | yy|2020-01-06|   2|   40|            240|
    | yy|2020-01-07|   2|   40|            240|
    | yy|2020-01-08|   2|   40|            240|
    | yy|2020-01-09|   2|   40|            240|
    | yy|2020-01-10|   2|   40|            240|
    | yy|2020-01-11|   2|   40|            240|
    | yy|2020-01-12|   2|   40|            240|
    +---+----------+----+-----+---------------+

In [44]:
list=[[1, 1, '12/3/2020'],
     [1,  0, '12/4/2020'],
     [1,   1, '12/5/2020'],
     [1,   1, '12/6/2020'],
     [1,   0, '12/7/2020'],
     [1,   1, '12/8/2020'],
          [1,   0, '12/9/2020'],
          [1,   0,'12/10/2020'],
          [1,   0,'12/11/2020'],
          [1,   1,'12/12/2020'],
          [2,   1, '12/1/2020'],
          [2,   0, '12/2/2020'],
          [2,   0, '12/3/2020'],
         [2,  1, '12/4/2020']]


df=spark.createDataFrame(list,['customer_id','Flag','trx_date'])
df.show()

In [45]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().orderBy("customer_id","trx_date")
w1=Window().partitionBy("Flag2").orderBy("trx_date").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("trx_date", F.to_date("trx_date", "MM/dd/yyyy"))\
  .withColumn("Flag2", F.sum("Flag").over(w))\
  .withColumn("destination", F.when(F.col("Flag")==0, F.first("trx_date").over(w1)).otherwise(F.col("trx_date")))\
  .withColumn("trx_date", F.date_format("trx_date","MM/dd/yyyy"))\
  .withColumn("destination", F.date_format("destination", "MM/dd/yyyy"))\
  .orderBy("customer_id","trx_date").drop("Flag2").show()

In [46]:
+-----------+----+----------+-----------+
|customer_id|Flag|  trx_date|destination|
+-----------+----+----------+-----------+
|          1|   1| 12/3/2020|  12/3/2020|
|          1|   0| 12/4/2020|  12/3/2020|
|          1|   1| 12/5/2020|  12/5/2020|
|          1|   1| 12/6/2020|  12/6/2020|
|          1|   0| 12/7/2020|  12/6/2020|
|          1|   1| 12/8/2020|  12/8/2020|
|          1|   0| 12/9/2020|  12/8/2020|
|          1|   0|12/10/2020|  12/8/2020|
|          1|   0|12/11/2020|  12/8/2020|
|          1|   1|12/12/2020| 12/12/2020|
|          2|   1| 12/1/2020|  12/1/2020|
|          2|   0| 12/2/2020|  12/1/2020|
|          2|   0| 12/3/2020|  12/1/2020|
|          2|   1| 12/4/2020|  12/4/2020|
+-----------+----+----------+-----------+

In [47]:
list=[['2020-01-13T22:22:10.000+0000',1,173736,3043,2996],
     ['2020-01-13T22:43:19.000+0000',1,173775,3042,2996],
     ['2020-01-14T22:43:19.000+0000',1,173775,3042,2996],
     ['2020-01-15T22:43:19.000+0000',1,173775,3042,2996],
     ['2020-01-13T22:22:10.000+0000',2,257624,1500,53],
     ['2020-01-13T22:43:19.000+0000',2,257625,1500,65],
     ['2020-01-13T22:22:10.000+0000',1,173736,3043,2996],
     ['2020-01-13T22:43:19.000+0000',1,173775,3042,2996],
     ['2020-01-14T22:43:19.000+0000',1,173775,3042,2996],
     ['2020-01-15T22:43:19.000+0000',1,173775,3042,2996],
     ['2020-01-13T22:22:10.000+0000',2,257624,1500,53],
     ['2020-01-13T22:43:19.000+0000',2,257625,1500,65],
     ['2020-01-13T22:22:10.000+0000',1,173736,3043,2996],
     ['2020-01-13T22:43:19.000+0000',1,173775,3042,2996],
     ['2020-01-14T22:43:19.000+0000',1,173775,3042,2996],
     ['2020-01-15T22:43:19.000+0000',1,173775,3042,2996],
     ['2020-01-13T22:22:10.000+0000',2,257624,1500,53],
     ['2020-01-13T22:43:19.000+0000',2,257625,1500,65],
     ['2020-01-13T22:22:10.000+0000',1,173736,3043,2996],
     ['2020-01-13T22:43:19.000+0000',1,173775,3042,2996],
     ['2020-01-14T22:43:19.000+0000',1,173775,3042,2996],
     ['2020-01-15T22:43:19.000+0000',1,173775,3042,2996],
     ['2020-01-13T22:22:10.000+0000',2,257624,1500,53],
     ['2020-01-13T22:43:19.000+0000',2,257625,1500,65]]

df=spark.createDataFrame(list,['date','id','stat1','stat2','stat3'])

df.show(truncate=False)

In [48]:
df1,df2,df3=df.randomSplit([0.3,0.3,0.4])

print(df1.count(),df2.count(),df3.count())

In [49]:
def split(df):
    if df.count()>2:
        df1,df2,df3=df.randomSplit([0.2,0.2,0.2],24)
        return df1.show(),df2.show(),df3.show()
    else:
        return df.show()
      
split(df)


In [51]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("id","date1").orderBy("date")
w2=Window().partitionBy("id","date1")
df.withColumn("date", F.to_timestamp("date","yyyy-MM-dd'T'HH:mm:ss"))\
  .withColumn("date1", F.to_date("date"))\
  .withColumn("rownum", F.row_number().over(w))\
  .withColumn("max", F.max("rownum").over(w2))\
  .filter('rownum=max').drop("date1","rownum","max")\
  .orderBy("id","date").show(truncate=False)

In [52]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("id","date1").orderBy("date").rangeBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("date", F.to_timestamp("date","yyyy-MM-dd'T'HH:mm:ss"))\
  .withColumn("date1", F.to_date("date"))\
  .withColumn("rownum", F.last("date").over(w))\
  .filter('rownum=date').drop("date1","rownum").orderBy("id","date").show(truncate=False)

In [53]:
df = spark.createDataFrame([['x',1, "9999"], ['x',2, "120"], ['x',3, "102"], ['x',4, "3000"],['x',5, "299"],['x',6, "100"]],['id',"row_number", "time_diff"])

df.show()

In [54]:
w=Window().partitionBy("id").orderBy("row_number")
df.withColumn("new_row_number", F.when(F.col("time_diff")>=160,F.col("row_number")))\
  .withColumn("new_row_number", F.last("new_row_number",True).over(w)).show()

In [55]:
list=[[ 5,     'a'],
      [  2,     'b'],
      [  3,     'c'],
      [ 6,    'a'],
      [ 7,     'b']]

dfviol=spark.createDataFrame(list,['Number','Letter'])


list1=[[ 1,     'a',                1,                 5,                 6,                10],
       [ 2,     'a',                7,                9,                 0,                  4],
       [ 3,     'a',               11,                15,                10,                 14],
       [ 4,     'b',                1,                 5,                 0,                  4],
       [ 5,     'b',                7,                 9,                 6,                 10],
       [ 6,     'c',                1,                 5,                 0,                  4],
       [ 7,     'c',                7,                 9,                 6,                 10],
       [ 8,     'c',               11,                15,                10,                 14]]


dfcent=spark.createDataFrame(list1,['ID','Letter','Num_range_low_odd','Num_range_high_odd','Num_range_low_even','Num_range_high_even'])

dfviol.show()
dfcent.show()

In [56]:
joinCondition = F.when(dfviol.Number%2== 0, [(dfcent.Num_range_low_even <= dfviol.Number) & (dfcent.Num_range_high_even >= dfviol.Number)]).otherwise([(dfcent.Num_range_low_odd <= dfviol.Number) & (dfcent.Num_range_high_odd >= dfviol.Number)])

df_full = dfviol.join(dfcent,[dfviol.Letter == dfcent.Letter, joinCondition], how='inner')
df_full.show()

In [57]:
#joinCondition = when(dfviol.Number%2== 0, [dfcent.Num_range_low_even <= dfviol.Number,dfcent.Num_range_high_even >= #dfviol.Number]).otherwise([dfcent.Num_range_low_odd <= dfviol.Number,dfcent.Num_range_high_odd >= dfviol.Number])

df1 = dfviol.filter(F.col("Number")%2==0).join(dfcent,[dfviol.Letter == dfcent.Letter, dfcent.Num_range_low_even <= dfviol.Number,dfcent.Num_range_high_even >= dfviol.Number], how='inner')
df2 = dfviol.filter(F.col("Number")%2!=0).join(dfcent,[dfviol.Letter == dfcent.Letter,dfcent.Num_range_low_odd <= dfviol.Number,dfcent.Num_range_high_odd >= dfviol.Number], how='inner')

df1.union(df2).show()

In [58]:
#joinCondition = when(dfviol.Number%2== 0, [dfcent.Num_range_low_even <= dfviol.Number,dfcent.Num_range_high_even >= #dfviol.Number]).otherwise([dfcent.Num_range_low_odd <= dfviol.Number,dfcent.Num_range_high_odd >= dfviol.Number])

df1 = dfviol.filter(F.col("Number")%2==0).join(dfcent,[dfviol.Letter == dfcent.Letter, dfcent.Num_range_low_even <= dfviol.Number,dfcent.Num_range_high_even >= dfviol.Number], how='inner')
df2 = dfviol.filter(F.col("Number")%2!=0).join(dfcent,[dfviol.Letter == dfcent.Letter,dfcent.Num_range_low_odd <= dfviol.Number,dfcent.Num_range_high_odd >= dfviol.Number], how='inner')

df1.union(df2).show()

In [59]:
joinCondition = F.expr("""IF(Number % 2==0, array((Num_range_low_even <= Number),(Num_range_high_even >= Number),(Letter)),array(((Num_range_low_odd <= Number) or (Num_range_high_odd >= Number) or (Letter1 == Letter))))""")

df_full = dfviol.withColumnRenamed("Letter","Letter1").join(dfcent,joinCondition, how='inner')
df_full.show()

In [60]:
from pyspark.sql.functions import when

joinCondition = when(dfviol.Number%2== 0, (dfcent.Num_range_low_even <= dfviol.Number)&(dfcent.Num_range_high_even >= dfviol.Number)).otherwise((dfcent.Num_range_low_odd <= dfviol.Number) & (dfcent.Num_range_high_odd >= dfviol.Number))

df_full = dfviol.join(dfcent,[dfviol.Letter == dfcent.Letter, joinCondition], how='inner')
df_full.show()

In [61]:
https://medium.com/@murtazahash