In [None]:
import mido as md
from mido import MidiFile
from mido import MetaMessage
import os
import numpy as np
import pandas as pd
from sklearn.cluster import DBSCAN
import statistics as stats

## Path request, check and MIDI file import

In [None]:
def importMidi(rollMidiPath):
    
    global rollMidiFile # <-- global: Mido MIDI file object
    global numMidiTracks # <-- global: number of MIDI track
    global ticksPerBeat # <-- global: MIDI file object ticks per beat (fixed to 96 ppqn anyway)
    
    rollMidiFile = MidiFile(rollMidiPath)
    numMidiTracks = len(rollMidiFile.tracks)
    ticksPerBeat = rollMidiFile.ticks_per_beat

## Dataframe creation

In [None]:
def dfCreate(rollMidiFile):
    
    global tempo # <-- global MIDI file tempo (fixed to 120bpm anyway)
    global columnsNames # <-- global: names of the columns of the 2d Python list
    global notesList # <-- global: 2d Python list with notes properties
    
    
    columnsNames = ["evntNmb", "msg", "note", "dtNoteOn", "tDtNoteOn", "dtNoteOff", "tDtNoteOff", "noteLng", "IOI", "cluster"]   
    notesList = []
    totalDeltaTime = 0
    intCount = 0

    for i in range(len(rollMidiFile.tracks[0])):
        try:
            rollMidiFile.tracks[0][i].tempo
        except:
            pass
        else:
            tempo = rollMidiFile.tracks[0][i].tempo
            break
    
    for counter in range(len(rollMidiFile.tracks[numMidiTracks-1])):

        try:    # I check the precence of delta time and possibly sum it to the totalDeltaTime
            rollMidiFile.tracks[numMidiTracks-1][counter].time
        except:
            pass
        else:
            totalDeltaTime += rollMidiFile.tracks[numMidiTracks-1][counter].time

        try:    # I check if the message is a note
            rollMidiFile.tracks[numMidiTracks-1][counter].note
        except:
            pass
        else:   # I collect note data

            # if note on

            if (rollMidiFile.tracks[numMidiTracks-1][counter].type == "note_on" and rollMidiFile.tracks[numMidiTracks-1][counter].velocity != 0):
                totalDeltaTimeNoteOn = totalDeltaTime
                notesList.append([counter, "note on", rollMidiFile.tracks[numMidiTracks-1][counter].note,  rollMidiFile.tracks[numMidiTracks-1][counter].time, totalDeltaTime, "tbp", "tbp", "tbp", "tbp", "/"]) # "tbp" stands for "to be populated"
                if len(notesList) == 1:
                        notesList[0][8] = totalDeltaTime         
                if len(notesList) > 1:    
                        notesList[len(notesList)-1][8] = (totalDeltaTime - notesList[len(notesList)-2][4])

            # if note off

            elif (rollMidiFile.tracks[numMidiTracks-1][counter].type == "note_off") or (rollMidiFile.tracks[numMidiTracks-1][counter].type == "note_on" and rollMidiFile.tracks[numMidiTracks-1][counter].velocity == 0):
                for v in range(len(notesList)):
                    if (notesList[v][1] == "note on") and (notesList[v][2] == rollMidiFile.tracks[numMidiTracks-1][counter].note) and (notesList[v][5] == "tbp"):
                        notesList[v][5] = rollMidiFile.tracks[numMidiTracks-1][counter].time
                        notesList[v][6] = totalDeltaTime
                        notesList[v][7] = notesList[v][6] - notesList[v][4]
                        break

## Definition of "sync" notes and DBSCAN parameters

In [None]:
def paramsDef():
    
    global syncVal # <-- global sync value
    global minNoteLen # <-- global minimum note length 
    global dbEps # <-- global epsilon DBSCAN
    global dbMinSamples # <-- global MinSamples DBSCAN
    
    syncVal = 6
    minNoteLen = 0
    dbEps = 4
    dbMinSamples = 20
    checkParams = True

## Update of notesList with "sync" notes

In [None]:
def syncDef(notesList):
    
    global pdNotesList # <-- global: Pandas dataframe based on notesList  
    global pdNotesListNoSync # <-- global: Pandas dataframe based on notesList with IOI != sync
    global totalNotes # <-- global: total number of events
    global totalValidNotes # <-- global: total number of non-sync events
       
    buffer = 0

    for i in range(len(notesList)):
        if (i < len(notesList) -1) and (notesList[i + 1][8] <= syncVal):
            buffer += notesList[i][8]
            notesList[i][8] = "sync"
        elif (i < len(notesList) -1) and (notesList[i + 1][8] > syncVal):
            notesList[i][8] += buffer
            buffer = 0
        else:
            notesList[i][8] += buffer      
    
    pdNotesList = pd.DataFrame(notesList, columns=columnsNames)
    pdNotesListNoSync = pdNotesList[pdNotesList['IOI'] != 'sync']
    totalNotes = len(pdNotesList)
    totalValidNotes = len(pdNotesListNoSync)

## DBSCAN

In [None]:
def preDbScanFunc():
    
    global pdNotesList
    global endProcess
    global totalClusters
    global clustersMeansList
    global clustersMediansList
    
    endProcess = [False]
    pdNotesList = pdNotesList[pdNotesList['IOI'] != "sync"]
    pdNotesList = pdNotesList[pdNotesList['IOI'] > minNoteLen]
    totalClusters = 0
    clustersMeansList = []
    clustersMediansList = []

def dbScanFunc():
    
    global pdNotesList
    global counter 
    global endProcess
    global totalClusters    
    
    pdNotesList = pdNotesList[pdNotesList['IOI'] > minNoteLen]

    npIoiArray=pdNotesList.iloc[:,8].values
    npIdIoiArray=pdNotesList.index.values
    
    npIoiDbscanArray = []
    
    for i in range(len(npIoiArray)):
        npIoiDbscanArray.append([npIoiArray[i]])
    
    endProcess = []

    dbs = DBSCAN(eps=dbEps, min_samples=dbMinSamples).fit(npIoiDbscanArray)

    numClusters = len(np.unique(dbs.labels_))
  
    for i in range(len(dbs.labels_)):
        pdNotesList.loc[npIdIoiArray[i], 'cluster'] = dbs.labels_[i]
   
    printDbScan = True
    
    for i in range(numClusters-1):    
        clusterDf = pdNotesList[pdNotesList['cluster'] == i]
        clusterIoiList = clusterDf['IOI'].tolist()
        clusterValMin = min(clusterIoiList)
        clusterValMax = max(clusterIoiList)
        clusterRange = clusterValMax - clusterValMin
        clusterMean = stats.mean(clusterIoiList)
        clusterMedian = stats.median(clusterIoiList)
        clusterSd = stats.pstdev(clusterIoiList)
        clusterCv = clusterSd / clusterMean        
        
        if (clusterSd <= 2 or clusterRange < 10) and clusterCv*100 <= 4:            
            beforeLength= len(pdNotesList)
            pdNotesList = pdNotesList[pdNotesList['cluster']!=i]
            afterLength= len(pdNotesList)
            totalClusters += 1
            clustersMeansList.append(round(clusterMean))
            clustersMediansList.append(round(clusterMedian))
            endProcess.append(True)
        else:
            endProcess.append(False)
            
def iterativeDbscan():

    global minNoteLen # <-- global minimum note length 
    global dbEps # <-- global epsilon DBSCAN
    global dbMinSamples # <-- global MinSamples DBSCAN
    global counter
    
    counter = 1

    while False in endProcess and counter < 8:    
        counter += 1
        dbScanFunc()
        if dbMinSamples < 50:
            dbMinSamples += 5
        if dbEps > 1:
            dbEps -= 1
        if minNoteLen < 30:
            minNoteLen += 4
    
    if True:

        print("\n\n\033[1m" + str(midiFile) + "\033[0m" )
        print("Total number of rhythmic events: " + str(totalNotes))
        print("Total number of non synced rhythmic events: " + str(totalValidNotes))
        print("Total number of valid clusters: " + str(totalClusters))
        print("Number of notes left: " + str(len(pdNotesList)) + " (" + str(round(100 * len(pdNotesList) / totalValidNotes)) + "%)")
        clustersMeansList.sort()
        clustersMediansList.sort()
        print("Mean values of valid clusters: " + str(clustersMeansList))
        print("Median values of valid clusters: " + str(clustersMediansList) + "\n")

        outliersCountFunc() # <-- outliers detection function call

## Outliers finder

In [None]:
def outliersCountFunc():
    
    outliersFoundFlag = False
    
    global pdNotesList # <-- global: Pandas dataframe based on notesList
    
    clustersMeansListExt = []
    for i in range(len(clustersMeansList)): 
        clustersMeansListExt.append(round(clustersMeansList[i]*1/3)-2)
        clustersMeansListExt.append(round(clustersMeansList[i]*1/3)-1)
        clustersMeansListExt.append(round(clustersMeansList[i]*1/3))
        clustersMeansListExt.append(round(clustersMeansList[i]*1/3)+1)
        clustersMeansListExt.append(round(clustersMeansList[i]*1/3)+2)     
        
    pdNotesList = pdNotesList[~pdNotesList['IOI'].isin(clustersMeansListExt)]
    
    pdNotesList.reset_index(inplace=True)
    pdNotesList = pdNotesList.rename(columns = {'index':'ID'})
    
    npIoiOutliersArray=pdNotesList.iloc[:,0].values
    npIdIoiOutliersArray=pdNotesList.index.values
    
    npIoiDbscanOutliersArray = []
    
    for i in range(len(npIoiOutliersArray)):
        npIoiDbscanOutliersArray.append([npIoiOutliersArray[i]])
    
    dbsOutliers = DBSCAN(eps=2, min_samples=2).fit(npIoiDbscanOutliersArray)

    numOutliersClusters = len(np.unique(dbsOutliers.labels_))
  
    for i in range(len(dbsOutliers.labels_)):
        pdNotesList.loc[npIdIoiOutliersArray[i], 'cluster'] = dbsOutliers.labels_[i]
    
    clusterOutliersStart = []
    clusterOutliersStop = []
    
    for i in range(0,len(pdNotesList)):    
        if i == 0:
            if pdNotesList.iloc[i,10] == -1:
                pass
            else:
                outlierClusterStartMt = int(pdNotesList.iloc[[i],[5]].values)   
                clusterOutliersStart.append(outlierClusterStartMt)            
                outliersFoundFlag = True 
        else:
            if (i == len(pdNotesList)-1) and pdNotesList.iloc[i,10] != -1 and pdNotesList.iloc[i-1,10] != -1:
                outlierClusterStopMt = int(pdNotesList.iloc[[i],[5]].values) + int(pdNotesList.iloc[[i],[8]].values)
                clusterOutliersStop.append(outlierClusterStopMt)                  
            elif pdNotesList.iloc[i,10] != -1 and pdNotesList.iloc[i,10] != pdNotesList.iloc[i-1,10]:            
                if pdNotesList.iloc[i-1,10] == -1:            
                    outlierClusterStartMt = int(pdNotesList.iloc[[i],[5]].values)   
                    clusterOutliersStart.append(outlierClusterStartMt)            
                    outliersFoundFlag = True            
                else:
                    outlierClusterStopMt = int(pdNotesList.iloc[[i-1],[5]].values) + int(pdNotesList.iloc[[i-1],[8]].values)
                    clusterOutliersStop.append(outlierClusterStopMt)                                             
                    outlierClusterStartMt = int(pdNotesList.iloc[[i],[5]].values)          
                    clusterOutliersStart.append(outlierClusterStartMt)               
                    outliersFoundFlag = True            
            elif pdNotesList.iloc[i,10] != -1 and pdNotesList.iloc[i,10] == pdNotesList.iloc[i-1,10]:
                pass            
            elif pdNotesList.iloc[i,10] == -1:
                if pdNotesList.iloc[i-1,10] != -1:
                    outlierClusterStopMt = int(pdNotesList.iloc[[i-1],[5]].values) + int(pdNotesList.iloc[[i-1],[8]].values)
                    clusterOutliersStop.append(outlierClusterStopMt)                
                else:
                    pass
    
    for i in range(len(clusterOutliersStart)):
        const = tempo / ticksPerBeat / 1000000
        if i == 0 and len(clusterOutliersStart) == 1:
            print("Possible expressive area start at " + str(int(clusterOutliersStart[i]*const//60)) + " minutes and " + str(round((clusterOutliersStart[i]*const%60),1)) + " seconds")
            print("End of possible expressive area at " + str(int(clusterOutliersStop[i]*const//60)) + " minutes and " + str(round((clusterOutliersStop[i]*const%60),1)) + " seconds\n")
        elif i == 0 and len(clusterOutliersStart) > 1:
            print("Possible expressive area start at " + str(int(clusterOutliersStart[i]*const//60)) + " minutes and " + str(round((clusterOutliersStart[i]*const%60),1)) + " seconds")         
        elif clusterOutliersStart[i] - clusterOutliersStop[i-1] > 500:
                print("End of possible expressive area at " + str(int(clusterOutliersStop[i-1]*const//60)) + " minutes and " + str(round((clusterOutliersStop[i-1]*const%60),1)) + " seconds\n")
                print("Possible expressive area start at " + str(int(clusterOutliersStart[i]*const//60)) + " minutes and " + str(round((clusterOutliersStart[i]*const%60),1)) + " seconds")
        if i == (len(clusterOutliersStart)-1) and i > 0:
            print("End of possible expressive area at " + str(int(clusterOutliersStop[i]*const//60)) + " minutes and " + str(round((clusterOutliersStop[i]*const%60),1)) + " seconds\n")

    if outliersFoundFlag == False:
        print("No expressive deviation was detected")

In [None]:
def genFunc(midiFilePath):
    
    importMidi(midiFilePath)
    dfCreate(rollMidiFile) # <-- dataframe creation function call
    paramsDef() # <-- parameters definition function call
    syncDef(notesList) # <-- update notesList with sync notes function
    preDbScanFunc() # <-- initialization DBSCAN parameters
    iterativeDbscan() # <-- iterative DBSCAN

In [None]:
folderPath = input("Please type the path of the MIDI Files folder: ")

try:
    os.path.exists(folderPath)
except:
    print("Invalid path. Please type a valid path: ")
    folderPath = input()
else:
    pass
filesList = os.listdir(folderPath)

for midiFile in filesList:
    if os.path.splitext(midiFile)[1] == ".mid":
        midiFilePath = (folderPath + midiFile)
        genFunc(midiFilePath)