**PER INSTALLARE PYSPARK**

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"
# install findspark using pip
!pip install -q findspark

In [3]:
#!pip install pyspark -q
#import pyspark

In [4]:
import findspark
findspark.init()

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [6]:
# mount your drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [7]:
from pyspark.sql import functions as func
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql.functions import concat,col,lit,desc, initcap
from operator import itemgetter
import numpy as np
import itertools 
from bisect import bisect
import pandas as pd


In [8]:
# loading data as parquet
pre_data_players = spark.read.parquet("/content/drive/MyDrive/Colab Notebooks/FootballDataAnalysis/data/Milan-Cagliari/parquet/frames_milan_cagliari.parquet").drop('index')
pre_events = spark.read.parquet("/content/drive/MyDrive/Colab Notebooks/FootballDataAnalysis/data/Milan-Cagliari/parquet/f24_opta_milan_cagliari.parquet").drop('index')

In [9]:
# visualize frames
pre_data_players.show(3)

+---------+----------+---------+------+----------+-----------+-------+---------+---------+--------------------+----------+-------------+
|      Vel|         x|        y|  Time|    player|       half|team_id|team_name|player_id|        italian_name|  position|real_position|
+---------+----------+---------+------+----------+-----------+-------+---------+---------+--------------------+----------+-------------+
|      0.0|       0.9|     33.2|5594.5|Castillejo|second half|   t120|    Milan|  p193449|samuel castillejo...|Midfielder|       Winger|
| 2.022165|0.83794284|     32.8|5594.7|Castillejo|second half|   t120|    Milan|  p193449|samuel castillejo...|Midfielder|       Winger|
|2.2488973|       0.8|32.578335|5594.8|Castillejo|second half|   t120|    Milan|  p193449|samuel castillejo...|Midfielder|       Winger|
+---------+----------+---------+------+----------+-----------+-------+---------+---------+--------------------+----------+-------------+
only showing top 3 rows



In [10]:
# visualize f24
pre_events.show(5)

+--------+-------+---------+---+---+-------+----------+----+----+---------+-------+--------------------+---------+------+----------+------------------+
|event_id|type_id|period_id|min|sec|team_id|   outcome|   x|   y|player_id|keypass|           type_name|team_name|player|  position|     real_position|
+--------+-------+---------+---+---+-------+----------+----+----+---------+-------+--------------------+---------+------+----------+------------------+
|       1|     34|       16|  0|  0|   t120|Successful| 0.0| 0.0|     None|   null|Team Set Up Succe...|     null|  null|      null|              null|
|       1|     34|       16|  0|  0|   t124|Successful| 0.0| 0.0|     None|   null|Team Set Up Succe...|     null|  null|      null|              null|
|       2|     32|        1|  0|  0|   t120|Successful| 0.0| 0.0|     None|   null|    Start Successful|     null|  null|      null|              null|
|       2|     32|        1|  0|  0|   t124|Successful| 0.0| 0.0|     None|   null|    S

In [11]:
#create parameters input boxes 
team = 'milan'
opponent = 'cagliari'
half = 'first half'
dict_half = {'first half':1,'second half':2}
half_split = '2'
player_in_possess = 'all'
offset_giocatori_eventi = {'cagliari':-2}

match_name = 'Milan-Cagliari'
team_a = match_name.split('-')[0].lower()
team_b = match_name.split('-')[1].lower()

if opponent == 'milan':
  opponent_f = team_b
else:
  opponent_f = opponent
  
events_to_consider = ['Ball Recovery Successful','Blocked Pass Successful','Interception Successful','Tackle Successful']

In [19]:
# FILTRIAMO IL TEMPO DI GIOCO

pre_data_players = pre_data_players.withColumnRenamed('Time','time')
pre_data_players = pre_data_players.withColumn('time', (pre_data_players['time']+offset_giocatori_eventi[opponent]).cast(DoubleType()))

pre_events_time = pre_events.withColumn('time', 60*pre_events['min']+pre_events['sec'])
pre_events_time = pre_events_time[pre_events_time['period_id'] == str(dict_half[half])]
max_times = pre_events_time.groupby('period_id').agg(func.min('time'),func.max('time'))

min_half_time = max_times.head()['min(time)']
max_half_time = max_times.head()['max(time)']

if half_split == '1':
  data_players = pre_data_players.where((pre_data_players['time'] > min_half_time) & (pre_data_players['time'] <= min_half_time+float((max_half_time-min_half_time)/2.0)))
  ppp_events = pre_events_time.where((pre_events_time['time'] > min_half_time) & (pre_events_time['time'] <= min_half_time+float((max_half_time-min_half_time)/2.0)))
  
elif half_split == '2':
  data_players = pre_data_players.where((pre_data_players['time'] > min_half_time+float((max_half_time-min_half_time)/2.0)) & (pre_data_players['time'] < max_half_time))
  ppp_events = pre_events_time.where((pre_events_time['time'] > min_half_time+float((max_half_time-min_half_time)/2.0)) & (pre_events_time['time'] < max_half_time))

In [20]:
pre_events_time.show(3), ppp_events.show(3)

+--------+-------+---------+---+---+-------+----------+----+----+---------+-------+----------------+---------+------+----------+------------------+----+
|event_id|type_id|period_id|min|sec|team_id|   outcome|   x|   y|player_id|keypass|       type_name|team_name|player|  position|     real_position|time|
+--------+-------+---------+---+---+-------+----------+----+----+---------+-------+----------------+---------+------+----------+------------------+----+
|       2|     32|        1|  0|  0|   t120|Successful| 0.0| 0.0|     None|   null|Start Successful|     null|  null|      null|              null|   0|
|       2|     32|        1|  0|  0|   t124|Successful| 0.0| 0.0|     None|   null|Start Successful|     null|  null|      null|              null|   0|
|       3|      1|        1|  0|  0|   t124|Successful|50.0|50.0|  p163632|   null| Pass Successful| Cagliari| Marin|Midfielder|Central Midfielder|   0|
+--------+-------+---------+---+---+-------+----------+----+----+---------+-------

(None, None)

In [13]:
ppp_events = ppp_events.withColumn('x', (ppp_events['x']*(112/100))-56.0).withColumn('y', (ppp_events['y']*(76/100))-38.0)

In [14]:
ppp_events.agg(func.min('x'),func.avg('x'),func.max('x'),func.min('y'),func.avg('y'),func.max('y')).show(10)

+------+------+------+------+------+------+
|min(x)|avg(x)|max(x)|min(y)|avg(y)|max(y)|
+------+------+------+------+------+------+
|  null|  null|  null|  null|  null|  null|
+------+------+------+------+------+------+



In [15]:
ppp_events.where(ppp_events['x'] < -52).show()

+--------+-------+---------+---+---+-------+-------+---+---+---------+-------+---------+---------+------+--------+-------------+----+
|event_id|type_id|period_id|min|sec|team_id|outcome|  x|  y|player_id|keypass|type_name|team_name|player|position|real_position|time|
+--------+-------+---------+---+---+-------+-------+---+---+---------+-------+---------+---------+------+--------+-------------+----+
+--------+-------+---------+---+---+-------+-------+---+---+---------+-------+---------+---------+------+--------+-------------+----+



In [16]:
data_players.agg(func.min('x'),func.avg('x'),func.max('x'),func.min('y'),func.avg('y'),func.max('y')).show(10)

+------+------+------+------+------+------+
|min(x)|avg(x)|max(x)|min(y)|avg(y)|max(y)|
+------+------+------+------+------+------+
|  null|  null|  null|  null|  null|  null|
+------+------+------+------+------+------+



In [69]:
team_goalie = data_players.where((data_players['team_name'] == team.title()) & (
    data_players['position'] == 'Goalkeeper')).groupBy('player').avg('x')
team_goalie_x = team_goalie.head()['avg(x)']

float(team_goalie_x)

TypeError: ignored

In [None]:

p_events = ppp_events.withColumn('_event_id',ppp_events['event_id'].cast(IntegerType())).orderBy('time').drop('event_id')

#ball_recovery_events = p_events.where(p_events['team_id'] == 't120').where(p_events['type_name'].isin(events_to_consider))

if team_goalie_x > 0:
  ball_lost_events = p_events.where(p_events['team_name'] != team.title()).where(p_events['type_name'].isin(events_to_consider)).where(p_events['x'] <= 0)
  verse = 'right_to_left'
  opponent_verse = 'left_to_right'
else:
  ball_lost_events = p_events.where(p_events['team_name'] != team.title()).where(p_events['type_name'].isin(events_to_consider)).where(p_events['x'] > 0)
  opponent_verse = 'right_to_left'
  verse = 'left_to_right'
  


In [None]:
verse

In [None]:
def aaa(x,panda):
  
  try:
    panda_pre = sorted(panda[panda['time'] > x[time_index]-10.0][panda['time'] < x[time_index]][panda['team_name'] != team.title()][['time','team_id','_event_id','player_id','type_name']].values.tolist())[-1]
  except:
    panda_pre = [0.0,'','','','']
    
  try:
    panda_time = panda[panda['type_name'].isin(events_to_consider)][panda['time'] == int(x[time_index])][panda['team_name'] != team.title()][['time','team_id','_event_id','player_id','type_name']].values.tolist()[0]
  except:
    panda_time = [0.0,'','','','']
    
  try:
    panda_post = sorted(panda[(panda['type_name'].isin(events_to_consider)) == False][panda['time'] >= x[time_index]][panda['time'] <= x[time_index]+10.0][panda['team_name'] != team.title()][['time','team_id','_event_id','player_id','type_name']].values.tolist())[0]
  except:
    panda_post = [0.0,'','','','']

  return panda_pre,panda_time,panda_post

ball_lost_events_rdd = ball_lost_events.rdd
panda_events = p_events.toPandas()

event_index = ball_lost_events.columns.index('_event_id')
time_index = ball_lost_events.columns.index('time')

pre_pre_pre_determined_timings = ball_lost_events_rdd.map(lambda g : aaa(g,panda_events))

pre_pre_determined_timings = pre_pre_pre_determined_timings.filter(lambda a : (('Successful' in str(a[2][-1])) and (str(a[2][-1]) != 'Ball Touch Successful')) or (str(a[2][-1]) == 'Pass Unsuccessful')).map(lambda a : float(np.round(a[1][0],1))).take(1000)

pre_determined_timings = list(dict.fromkeys(pre_pre_determined_timings))

for j in pre_determined_timings:
  for k in list(np.arange(j+1.0,j+7.0,1.0)):
    if k in pre_determined_timings:
      pre_determined_timings.remove(k)        
      
pre_pre_pre_determined_timings.take(1000), pre_determined_timings

In [None]:
events = ball_lost_events.where(ball_lost_events['time'].isin(pre_determined_timings))
#events = events.withColumn('x', (events['x']*(105/100))-52.5).withColumn('y', (events['y']*(68/100)-34.0))
events.agg(func.avg('x'),func.avg('y')).show(10)

In [None]:
team_a_lower = team_a.lower()
granularity = 0.5
max_ball_distance = 70.0
max_meters_distance_player_can_receive_ball = 30.0
opponent_lower = opponent.lower()

In [None]:
team_a_lower, opponent_lower

In [None]:
def slopeFuzzyCompare(slope_pre,slope_post):
  
  try:
    if float(np.abs(slope_pre-slope_post))>8.0:
      return 16 - float(np.abs(slope_pre-slope_post))
    else:
      return float(np.abs(slope_pre-slope_post))
  except:
    return 9.0
  
def ifHigherThanLimit(a,limit,maxMinFlag,offSideFlag):
  
  if offSideFlag == 'off-side':
    return 0.0
  else:
    if maxMinFlag == 'max':
      return float(np.min([limit,a]))
    elif maxMinFlag == 'min':
      return float(np.max([limit,a]))

def computeTime(current_speed, speed_threshold, acceleration_threshold, slopeCompare, distance):
  
    #going straight or slightly curving should not have an impact or reaction time
    #float(slopeCompare/8.0) means that if the player has to go in the completely opposite direction it will take 1 second longer than going straight (highest slopeCompare value is 8)
    reaction_distance = current_speed*0.7*(float(slopeCompare/8.0))
    real_distance = distance + reaction_distance
    
    time = float(real_distance)/float(speed_threshold)
    
    return time,reaction_distance

def verifyIfOffSide(playerData, offSideLine):
  
  player_team = playerData[2]
  try:
    offSideLine = list(filter(lambda a : a[1] != player_team, offSideLine))[0]

    if offSideLine[-1] == 'left_to_right' and playerData[3] < offSideLine[2]:
      return 'off-side'
    elif offSideLine[-1] == 'right_to_left' and playerData[3] > offSideLine[2]:
      return 'off-side'
    else:
      return ''
  except:
    return ''

In [None]:
pre_data = data_players.withColumn('speed', data_players['Vel'].cast(DoubleType())).withColumn('x', data_players['x'].cast(DoubleType())).withColumn('y', data_players['y'].cast(DoubleType())).drop('Vel')
pre_data = pre_data.where(pre_data['player'] != 'ball').withColumnRenamed('player_a','player_id')

pre_players_bis = pre_data.withColumnRenamed('speed','smoothed_speed')

pre_players_bis.show(4)

In [None]:
x_loc = list(np.arange(-55.0,55.0,1.0))
y_loc = list(np.arange(-37.0,37.0,1.0))

In [None]:
df_players_no_out = pre_players_bis.withColumn('smoothed_acceleration', lit(0.0)).select('player','time','team_name','x','y','smoothed_speed','smoothed_acceleration')

In [None]:
pd_locs = df_players_no_out.where(pre_players_bis['time'].isin(pre_determined_timings)).toPandas()

In [None]:
# aggiungere il campo Nome Giocatore e Nome Squadra 
team_a_passes_pivot = events.select('time','x', 'y', 'team_id', 'player')
team_a_passes_pivot = team_a_passes_pivot.withColumnRenamed('x','start_x').withColumnRenamed('y','start_y').withColumnRenamed('player','player_a').withColumnRenamed('team_name','team')

team_a_passes_pivot.show(4)

In [None]:
timings_rdd = spark.sparkContext.parallelize(pre_determined_timings)
timings_rdd.take(10)

In [None]:
panda_out = df_players_no_out.orderBy('time').toPandas()

panda_out

In [None]:
players_list = list(map(lambda d : d[0], df_players_no_out.select('player').distinct().collect()))

team_players_list = list(map(lambda d : d[0], df_players_no_out.where(df_players_no_out['team_name'] != team.title()).select('player').distinct().collect()))

players_list, team_players_list

In [None]:
def findIfOffside(opponents_x,team,player_x,verse):
  
  if str(team) == str(opponent.title()):
    if verse == 'left_to_right':
      if player_x > opponents_x[-2]:
        return 'off-side'
      else:
        return ''
    else:
      if player_x < opponents_x[1]:
        return 'off-side'
      else:
        return ''     
  else:
    return ''
  
def fillInfos(t,panda):
  
  obs = []
  opponents_x_s = sorted(panda[panda['team_name'] == 'Milan'][panda['time'] == t]['x'].values.tolist())
  
  for p in players_list:
    pt = panda[panda['player'] == p][panda['time'] == t].values.tolist()
    pre_pt = panda[panda['player'] == p][panda['time'] < t][['x','y']].values.tolist()
    if len(pt) > 0:
      try:
        pre_pt = panda[panda['player'] == p][panda['time'] < t][['x','y']].values.tolist()[-1]
        obs.append(pt[0]+pre_pt+[findIfOffside(opponents_x_s,pt[0][2],pt[0][3],opponent_verse)])
      except:
        pass
    else:
      try:
        pre_value = panda[panda['player'] == p][panda['time'] < t].values.tolist()[-1]
        post_value = panda[panda['player'] == p][panda['time'] > t].values.tolist()[0]
        percentage_of_t_wrt_pre_post = float(t-pre_value[1])/float(post_value[1]-pre_value[1])
        percentage_of_pre_t_wrt_pre_post = float(t-0.5-pre_value[1])/float(post_value[1]-pre_value[1])
        new_x = pre_value[3]+(percentage_of_t_wrt_pre_post*(float(post_value[3]-pre_value[3])))
        new_y = pre_value[4]+(percentage_of_t_wrt_pre_post*(float(post_value[4]-pre_value[4])))
        new_speed = pre_value[5]+(percentage_of_t_wrt_pre_post*(float(post_value[5]-pre_value[5])))
        new_pre_x = pre_value[3]+(percentage_of_pre_t_wrt_pre_post*(float(post_value[3]-pre_value[3])))
        new_pre_y = pre_value[4]+(percentage_of_pre_t_wrt_pre_post*(float(post_value[4]-pre_value[4])))
        obs.append([p,t,pre_value[2],new_x,new_y,new_speed,0.0,new_pre_x,new_pre_y,findIfOffside(opponents_x_s,pre_value[2],new_x,verse)])
      except:
        pass
      
  return obs
  
case_ = spark.createDataFrame(timings_rdd.flatMap(lambda a : fillInfos(a,panda_out)), df_players_no_out.columns+['pre_x','pre_y','offside_flag'])

case_.where(case_['offside_flag'] != '').orderBy('time').show(5)

In [None]:
pd_ball_gapped = team_a_passes_pivot.select('time','player_a','start_x','start_y').withColumnRenamed('start_x','x').withColumnRenamed('start_y','y').toPandas()

pd_ball_gapped[pd_ball_gapped['time'].isin(pre_determined_timings)]

In [None]:
pre_case = case_.withColumn('slope', (case_['y']-case_['pre_y'])/(case_['x']-case_['pre_x']))
pre_case_s = pre_case.withColumn('x_spread', pre_case['x']-pre_case['pre_x'])
pre_case_s = pre_case_s.withColumnRenamed('player','player_id').withColumnRenamed('team_name','team')

pre_case_ss = pre_case_s.select('player_id','time','team','x','y','smoothed_speed','smoothed_acceleration','pre_x','pre_y','offside_flag','slope','x_spread')

In [None]:
bisectList = [-2.4142,-1,-0.4142,0,0.4142,1,2.4142]

def slopeFuzzy(slope,x_spread,bisectList):
  
  try:
    slope_index = float(bisect(bisectList,slope))+5.0

    if x_spread > 0:
      sign = np.sign(8.1-slope_index)
      return float(slope_index)+float(sign)*8.0
    else:
      return slope_index
  except:
    return 0
  
slope_index = pre_case_ss.columns.index('slope')
x_spread_index = pre_case_ss.columns.index('x_spread')

case = spark.createDataFrame(pre_case_ss.rdd.map(lambda a : [str(a[0]).replace('_',' ')]+list(a[1:])+[slopeFuzzy(a[slope_index],a[x_spread_index],bisectList)]),pre_case_ss.columns+['slope_fuzzy']).orderBy('time')

In [None]:
case.head()

In [None]:
time_xy_list = [pre_determined_timings,x_loc,y_loc]

timings_locs_rdd = spark.sparkContext.parallelize(list(itertools.product(*time_xy_list)))

timings_locs_rdd.take(1)[0]

In [None]:
panda_to_use = case.toPandas()

In [None]:
def addLocs(time,x,y,pandaBall,pandaPlayers):
  
  try:
    x_ball,y_ball = pandaBall[pandaBall['time'] == float(time)][['x','y']].values.tolist()[0]

    players_f = pandaPlayers[pandaPlayers['time'] == float(time)][pandaPlayers['offside_flag'] == ''].values.tolist()

    out = []

    results = []
    for a in players_f:
      slope_target = float(y-a[4])/float(x-a[3])
      distance = float(np.sqrt(float((y-a[4])**2)+float((x-a[3])**2)))
      slope_target_fuzzy = slopeFuzzy(slope_target,x-a[3],bisectList)
      slope_fuzzy = a[-1]
      slope_compare = slopeFuzzyCompare(slope_fuzzy,slope_target_fuzzy)
      player_time, reaction_distance = computeTime(a[5],6.0,7.0,slope_compare,distance)
      ball_distance = float(np.sqrt((x_ball-x)**2 + (y_ball-y)**2))
      ball_time = ball_distance/15.0
      results.append([float(x),float(y),x_ball,y_ball,distance,slope_compare,slope_target_fuzzy,slope_compare,ball_distance,ball_time,player_time,reaction_distance,ball_time - player_time,a[-4],slope_fuzzy]+list(a[:6]))  
      try:
        no_off_side = list(filter(lambda a : a[13] == '', results))
        best_time = sorted(results, key = itemgetter(10), reverse = False)[0][10] 
        results_with_best = list(map(lambda d : list(d)+[best_time, d[10] - best_time], results))
        filtered_results_with_best = list(filter(lambda a : (a[4]< max_meters_distance_player_can_receive_ball) and (a[-1]< 0.5) and (a[8] < max_ball_distance),results_with_best))
        length = len(filtered_results_with_best)
        avg_time_spread = float(np.mean(list(map(lambda d : d[-1], filtered_results_with_best))))
        results_with_best_and_length = list(map(lambda d : list(d)+[length,ifHigherThanLimit(ifHigherThanLimit((1+1.4*(-d[-1]+avg_time_spread))*(100.0/float(length)),100,'max',d[12]),0,'min',d[12])], filtered_results_with_best))

        #out = list(map(lambda d : [d[0],d[1],d[8],d[5],d[14],d[15],d[16],d[17],d[18],d[9],d[10],d[-4],d[-3],d[-2],d[-1]], results_with_best_and_length))
        out = list(map(lambda d : [d[0],d[1],d[8],d[5],d[14],d[15],d[16],d[17],d[18],d[19],d[-1]], results_with_best_and_length))
      except:
        out = []

    return out
  except:
    return [[0.0,0.0,0.0,0.0,0.0,'',time,'',0.0,0.0,0.0]]


In [None]:
offside_players = case.where(case['offside_flag'] != '').withColumn('target_y', lit(10000)).withColumn('target_x', lit(10000)).withColumn('probability', lit(0.0)).withColumn('distance_from_target', lit(0.0)).withColumn('ball_distance', lit(0.0)).withColumn('slope_fuzzy_compare', lit(100)).select('target_x','target_y','ball_distance','slope_fuzzy_compare','player_id','time','team','x','y','probability','distance_from_target').distinct()

offside_players.show(2)

In [None]:
timings_players = spark.createDataFrame(timings_locs_rdd.flatMap(lambda d : addLocs(d[0],d[1],d[2],pd_ball_gapped,panda_to_use)), ['target_x','target_y','ball_distance','slope_fuzzy_compare','slope','player_id','time','team','x','y','probability'])

timings_players = timings_players.withColumn('distance_from_target', func.sqrt(((timings_players['target_x']-timings_players['x'])**2)+((timings_players['target_y']-timings_players['y'])**2))).drop('slope')

timings_players.show()

In [None]:
possess_out = team_a_passes_pivot.select('time','player_a','start_x','start_y').withColumnRenamed('start_x','x').withColumnRenamed('start_y','y').withColumn('target_x', lit(-999.0)).withColumn('target_y', lit(-999.0)).withColumn('probability', lit(-999.0)).withColumn('distance_from_target', lit(-999.0)).withColumnRenamed('player_a','player_id').withColumn('team', lit(team_a)).withColumn('slope_fuzzy_compare',lit(100)).withColumn('ball_distance',lit(100))

In [None]:
possess_out_selected = possess_out.select(offside_players.columns)
possess_out_selected = possess_out_selected.withColumn('team', func.initcap(possess_out_selected['team']))

In [None]:
possess_out_selected.show()

In [None]:
timings_players_out = timings_players.union(possess_out_selected).union(offside_players).distinct()

In [None]:
new_path = 'pitch_control_'+team +"_"+str(opponent)+"_"+str(half).replace(' ','_')+"_"+str(half_split)+"_"+player_in_possess

In [None]:
temp_path = "/content/drive/MyDrive/Colab Notebooks/FootballDataAnalysis/data/Milan-Cagliari/parquet/__temp"
target_path = "/content/drive/MyDrive/Colab Notebooks/FootballDataAnalysis/data/Milan-Cagliari/parquet/"+new_path+".csv"

timings_players_out.coalesce(1).write.mode("overwrite").option('delimiter',',').option('header','true').csv(temp_path)

Path = spark.sparkContext._gateway.jvm.org.apache.hadoop.fs.Path

# get the part file generated by spark write
fs = Path(temp_path).getFileSystem(spark.sparkContext._jsc.hadoopConfiguration())
csv_part_file = fs.globStatus(Path(temp_path + "/part*"))[0].getPath()

# move and rename the file
fs.rename(csv_part_file, Path(target_path))
fs.delete(Path(temp_path), True)

In [None]:
new_path

In [None]:
#timings_players_out.write.mode('overwrite').parquet('/content/drive/MyDrive/Colab Notebooks/FootballDataAnalysis/data/Milan-Cagliari/parquet/' + 'pitch_control_' + name_path + '.parquet')

In [None]:
### END ####