In [1]:
from gnutools import fs
from bpd import cfg
from bpd.dask import DataFrame, udf
from bpd.dask import functions as F
from gnutools.remote import gdrivezip

In [2]:
# Import a sample dataset
df = DataFrame({"filename": fs.listfiles(gdrivezip(cfg.gdrive.google_mini)[0], [".wav"])})
df.compute()      

Unnamed: 0,filename
0,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/919...
1,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/6a2...
2,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/682...
3,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/beb...
4,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/d37...


In [3]:
# Register a user-defined function
@udf
def word(f):
    return fs.name(fs.parent(f))

# Apply a udf function
df\
.withColumn("classe", word(F.col("filename")))\
.compute()    

Unnamed: 0,filename,classe
0,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/919...,wow
1,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/6a2...,wow
2,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/682...,wow
3,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/beb...,wow
4,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/d37...,wow


In [4]:
# You can use inline udf functions
df\
.withColumn("name", udf(fs.name)(F.col("filename")))\
.display()

Unnamed: 0,filename,name
0,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/919...,919d3c0e_nohash_2
1,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/6a2...,6a27a9bf_nohash_0
2,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/682...,6823565f_nohash_2
3,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/beb...,beb49c22_nohash_1
4,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/d37...,d37e4bf1_nohash_0


In [5]:
# Retrieve the first 3 filename per classe
df\
.withColumn("classe", word(F.col("filename")))\
.aggregate("classe")\
.withColumn("filename", F.top_k(F.col("filename"), 3))\
.explode("filename")\
.compute()

Unnamed: 0_level_0,filename
classe,Unnamed: 1_level_1
wow,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/919...
wow,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/6a2...
wow,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/682...


In [6]:
# Add the classe column to the original dataframe
df = df\
.withColumn("classe", word(F.col("filename")))

# Display the modified dataframe
df.display()

Unnamed: 0,filename,classe
0,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/919...,wow
1,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/6a2...,wow
2,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/682...,wow
3,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/beb...,wow
4,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/d37...,wow


In [7]:
# Display the dataframe
# Retrieve the first 3 filename per classe
@udf
def initial(classe):
    return classe[0]
    

_df = df\
.aggregate("classe")\
.reset_index(hard=False)\
.withColumn("initial", initial(F.col("classe")))\
.select(["classe", "initial"])\
.set_index("classe")

# Display the dataframe grouped by classe
_df.compute()
    

Unnamed: 0_level_0,initial
classe,Unnamed: 1_level_1
wow,w


In [8]:
_df_initial = _df.reset_index(hard=False).aggregate("initial")
_df_initial.display()

Unnamed: 0_level_0,classe
initial,Unnamed: 1_level_1
w,[wow]


In [9]:
_df.display()

Unnamed: 0,classe,initial
0,wow,w


In [10]:
# Join the dataframes
df\
.join(_df, on="classe").drop_column("classe")\
.join(_df_initial, on="initial")\
.display()

Unnamed: 0,filename,initial,classe
0,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/919...,w,[wow]
0,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/6a2...,w,[wow]
0,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/682...,w,[wow]
0,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/beb...,w,[wow]
0,/tmp/1y4gwaS7LjYUhwTex1-lNHJJ71nLEh3fE/wow/d37...,w,[wow]
