In [None]:
import librosa
import librosa.display
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from datetime import datetime
from moviepy.editor import VideoFileClip
from google.colab import files
import os
from os import listdir
from os.path import join

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# **Functions**

In [None]:
def get_frame_rate(file_path):
    video_clip = VideoFileClip(file_path)

    global frame_rate

    frame_rate = video_clip.fps
    video_clip.close()

    return frame_rate

In [None]:
def timestamp_to_seconds(timestamp,frame_rate):
    hours, minutes, seconds, frames = map(int, timestamp.split(':'))
    return (hours * 3600 + minutes * 60 + seconds) + frames / frame_rate

In [None]:
def duration_cal(row, frame_rate):
    start_seconds = timestamp_to_seconds(row['Start.Timecode'], frame_rate)
    end_seconds = timestamp_to_seconds(row['End.Timecode'], frame_rate)

    duration_seconds = end_seconds - start_seconds

    return duration_seconds

In [None]:
# path info to get the transcription
mypath = '/content/drive/My Drive/closeness/Observation_Study_Segments'

def get_data(pair_num,participant_num,question_num):
  transcription_path = os.path.join(mypath, f'Pair {pair_num}', f'Pair{pair_num}_Transcriptions.xlsx')

  sheet_name = f'Participant{participant_num}_Q{question_num}'

  # read transcription
  ts_all = pd.read_excel(transcription_path, sheet_name=sheet_name, header=None)
  ts_all.columns = ['Start.Timecode', 'End.Timecode', 'Speaker', 'Transcript']
  ts_all['Speaker'] = ts_all['Speaker'].str.replace(' ', '')

  return ts_all

In [None]:
def speaker(pair_num,participant_num,question_num):
  #file_path = mypath + '/Pair' + pair_num + '/Pair' + pair_num + '_' + participant_num + '.mp4'
  file_path1 = os.path.join(mypath, f'Pair {pair_num}', f'Pair{pair_num}_{participant_num}.mp4')
  file_path2 = os.path.join(mypath, f'Pair {pair_num}', f'Pair{pair_num} {participant_num}.mp4')

  if os.path.exists(file_path1):
    file_path = file_path1
  else:
    file_path = file_path2

  frame_rate = get_frame_rate(file_path)

  ts_all = get_data(pair_num,participant_num,question_num)
  speak1_ts = ts_all[ts_all['Speaker'] == 'Speaker1']
  speak2_ts = ts_all[ts_all['Speaker'] == 'Speaker2']

  speak1_ts['duration'] = speak1_ts.apply(duration_cal, axis=1, frame_rate=frame_rate)
  speak2_ts['duration'] = speak2_ts.apply(duration_cal, axis=1, frame_rate=frame_rate)

  return file_path,speak1_ts,speak2_ts

In [None]:
def process_combined(df, file_path):
    # Initialize arrays to store combined audio and durations
    combined_audio = np.array([])
    combined_durations = []

    # Iterate over each row in the dataframe
    for index, row in df.iterrows():
        start_time = timestamp_to_seconds(row['Start.Timecode'],frame_rate)
        duration = row['duration']

        # Load audio segment
        audio_segment, sr = librosa.load(file_path, offset=start_time, duration=duration)

        # Concatenate audio segment and update combined durations
        combined_audio = np.concatenate([combined_audio, audio_segment])
        combined_durations.append(duration)

    return combined_audio,sr,combined_durations

In [None]:
def seconds_to_timestamp(seconds):
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    seconds = int(seconds % 60)
    frames = int((seconds % 1) * frame_rate)
    return f"{hours:02}:{minutes:02}:{seconds:02}:{frames:02}"

In [None]:
# update - based on timestamp (result: each second has lots of pitch result)
def feature_extraction(combined_audio, sr, frame_rate):
    pitches, magnitudes = librosa.core.piptrack(y=combined_audio, sr=sr)

    pitch_values = []
    timestamps = []

    for t in range(pitches.shape[1]):
        pitch = pitches[:, t]
        magnitude = magnitudes[:, t]

        if np.any(magnitude > 0):
            index = magnitude.argmax()
            pitch_value = pitch[index]
        else:
            pitch_value = 0

        timestamp = t / frame_rate
        pitch_values.append(pitch_value)
        timestamps.append(timestamp)

    result = list(zip(timestamps, pitch_values))

    #variation_combined = np.var(combined_audio)

    return result#, variation_combined

In [None]:
def pitch_with_timestamp(pitch):
  df = pd.DataFrame(pitch, columns=['Timestamp', 'Pitch'])
  df['Timestamp'] = df['Timestamp'].apply(lambda x: seconds_to_timestamp(x))

  grouped_df = df.groupby('Timestamp')['Pitch'].mean().reset_index()
  return grouped_df

In [None]:
def summary_run(pair_num,participant_num):
  file_path, speak1_ts, speak2_ts = speaker(pair_num,participant_num,question_num)

  # load the audio
  combined_audio_1, sr, combined_durations_1 = process_combined(speak1_ts, file_path)
  combined_audio_2, sr, combined_durations_2 = process_combined(speak2_ts, file_path)

  # feature extraction
  pitch_1 = feature_extraction(combined_audio_1, sr,frame_rate)
  pitch_2 = feature_extraction(combined_audio_2, sr,frame_rate)

  # add timestamp
  pitch_1_df = pitch_with_timestamp(pitch_1)
  pitch_2_df = pitch_with_timestamp(pitch_2)

  return pitch_1_df,pitch_2_df#,variation_1,variation_2

In [None]:
def export_output(pair_num, participant_num, pitch_1_df, pitch_2_df):
  output_folder = '/content/drive/My Drive/closeness/csv_output/July/'

  file_path_1 = os.path.join(output_folder, f"pair{pair_num}_participant{participant_num}_speaker1_pitch.csv")
  file_path_2 = os.path.join(output_folder, f"pair{pair_num}_participant{participant_num}_speaker2_pitch.csv")

  pitch_1_df.to_csv(file_path_1, index=False)
  pitch_2_df.to_csv(file_path_2, index=False)

# **Run**

**Pair 11 Participant 1_Q3 only has Speaker2

**Pair 12 Participant 1_Q3 has 2 rows of data without specifying Speaker 1 or Speaker 2 (skipped those 2 rows)

**Pair 20 doesn't have transcription data

**Pair 31 doesn't have transcription data

**Pair 33 doesn't have transcription data

In [None]:
# only need to update here
pair_num = '29'
participant_num = '2'
question_num = '8'

In [None]:
pitch_1_df,pitch_2_df = summary_run(pair_num,participant_num)

In [None]:
export_output(pair_num,participant_num, pitch_1_df, pitch_2_df)