Download from SDSSDR18 all spectra from the match

In [21]:
import os
import time
import numpy as np
import pandas as pd
from astroquery.sdss import SDSS
from astropy.io import fits
from astropy.table import Table
from astroquery.gaia import Gaia

In [17]:
crossmatch = pd.read_csv('/home/thara/Big/gaia_results_all_batches.csv')
crossmatch

Unnamed: 0,SOURCE_ID,original_ext_source_id
0,3647851003078664832,1237648703516442877
1,3661958424457651712,1237648704049840359
2,3662377883848390912,1237648704586973290
3,4419851136648786304,1237648705670152326
4,4407884120813978752,1237648705678213667
...,...,...
133828,1779912581508627072,1237680273108960003
133829,309194386402373120,1237680285996155390
133830,1883704073988238336,1237680303715582376
133831,2866659811294043904,1237680308016841309


In [16]:
# Initialize an empty DataFrame to store results
spec_df = pd.DataFrame()

# Define a small batch size for querying
batch_size = 50  # Small batch size to avoid exceeding query limits
max_retries = 3  # Number of retries for failed batches

# Process bestObjIDs in small batches
best_obj_ids = crossmatch['original_ext_source_id'].unique()
for batch_index, i in enumerate(range(0, len(best_obj_ids), batch_size), start=1):
    batch = best_obj_ids[i:i + batch_size]
    batch_ids = ','.join(map(str, batch))  # Properly format IDs
    
    query = f"""
    SELECT plate, mjd, fiberid, bestObjID
    FROM SpecObj
    WHERE bestObjID IN ({batch_ids})
    """
    
    retries = 0
    success = False
    
    while retries < max_retries and not success:
        try:
            # Query the SDSS database
            result = SDSS.query_sql(query)
            if result is not None:
                # Append results to the DataFrame
                spec_df = pd.concat([spec_df, result.to_pandas()], ignore_index=True)
            
            # Print progress every 100 batches
            if batch_index % 100 == 0 or batch_index == 1:
                print(f"Processed batch {batch_index}/{len(best_obj_ids) // batch_size + 1}.")
            
            success = True  # Mark this batch as successful
        except Exception as e:
            retries += 1
            print(f"Error processing batch {i} to {i + len(batch)}: {e}. Retrying ({retries}/{max_retries})...")
            time.sleep(2)  # Add a short delay before retrying

    if not success:
        print(f"Batch {i} to {i + len(batch)} failed after {max_retries} retries.")

# Save the results to a file
spec_df.to_csv("specobj_results.csv", index=False)
print("Query completed. Results saved to 'specobj_results.csv'.")

Processed batch 1/2676.
Processed batch 100/2676.
Processed batch 200/2676.
Processed batch 300/2676.
Processed batch 400/2676.
Processed batch 500/2676.
Processed batch 600/2676.
Processed batch 700/2676.
Processed batch 800/2676.
Processed batch 900/2676.
Processed batch 1000/2676.
Processed batch 1100/2676.
Processed batch 1200/2676.
Processed batch 1300/2676.
Processed batch 1400/2676.
Processed batch 1500/2676.
Processed batch 1600/2676.
Processed batch 1700/2676.
Processed batch 1800/2676.
Processed batch 1900/2676.
Processed batch 2000/2676.
Processed batch 2100/2676.
Processed batch 2200/2676.
Processed batch 2300/2676.
Processed batch 2400/2676.
Processed batch 2500/2676.
Processed batch 2600/2676.
Query completed. Results saved to 'specobj_results.csv'.


In [14]:
metadata_df = pd.read_csv('/home/thara/Big/specobj_results.csv')
metadata_df

Unnamed: 0,plate,mjd,fiberid,bestObjID
0,303,51615,205,1237648703516442877
1,299,51671,179,1237648704049840359
2,300,51943,309,1237648704586973290
3,311,51665,443,1237648705670152326
4,364,52000,380,1237648705678213667
...,...,...,...,...
133789,7572,56944,710,1237680273108960003
133790,6265,56248,512,1237680285996155390
133791,6295,56536,434,1237680303715582376
133792,6515,56536,932,1237680308016841309


In [18]:
# Check for duplicates in original_ext_source_id
duplicate_ids = crossmatch[crossmatch.duplicated(subset='original_ext_source_id', keep=False)]

if duplicate_ids.empty:
    print("All IDs in 'original_ext_source_id' are unique.")
else:
    print(f"Found {len(duplicate_ids)} duplicate entries in 'original_ext_source_id'.")
    print("Duplicate entries:")
    print(duplicate_ids)

Found 77 duplicate entries in 'original_ext_source_id'.
Duplicate entries:
                  SOURCE_ID  original_ext_source_id
91       921068247868697728     1237653587407536295
92       921068243577055232     1237653587407536295
349      706252331821810944     1237660961862189211
350      706252331823294464     1237660961862189211
406     1541291590181192320     1237661852007727118
...                     ...                     ...
131800   147929260770079872     1237660558135656804
132095   148055807685252352     1237660559209332868
132096   148055807686085120     1237660559209332868
132667  3706417688227261568     1237661970652397647
132668  3706417692523377408     1237661970652397647

[77 rows x 2 columns]


The `Download.py` script should be used to download the spectra from SDSS.

In [3]:
# Define directories
fits_dir = "/home/thara/Big/Spectra"  # Directory with FITS files
parquet_dir = "/home/thara/Big/Spectra_Parquet"  # Directory for Parquet files
os.makedirs(parquet_dir, exist_ok=True)  # Ensure the Parquet directory exists

# Load the specObj list
spectra_info = '/home/thara/Big/specobj_results.csv'  # Replace with your file

# Define batch size and retry settings
batch_size = 1000  # Number of spectra to process in one batch
max_retries = 3  # Number of retries for failed downloads

# Process existing FITS files
print("Processing existing FITS files...")
for fits_file in os.listdir(fits_dir):
    if fits_file.endswith(".fits"):
        # Extract plate, mjd, and fiberid from the filename
        file_parts = fits_file.split("_")
        plate = file_parts[1][5:]
        mjd = file_parts[2][3:]
        fiberid = file_parts[3][5:].replace(".fits", "")
        
        # Define Parquet filename
        parquet_file = os.path.join(parquet_dir, f"spectrum_plate{plate}_mjd{mjd}_fiber{fiberid}.parquet")
        
        # Skip if Parquet file already exists
        if os.path.exists(parquet_file):
            continue
        
        # Open FITS file and extract data
        fits_path = os.path.join(fits_dir, fits_file)
        try:
            with fits.open(fits_path) as hdul:
                data = Table(hdul[1].data)
                
                # Convert to native byte order
                flux = np.array(data["flux"]).byteswap().newbyteorder()
                loglam = np.array(data["loglam"]).byteswap().newbyteorder()
            
            # Save to Parquet
            df = pd.DataFrame({"loglam": loglam, "flux": flux})
            df.to_parquet(parquet_file, index=False)
            
            # Delete the FITS file
            os.remove(fits_path)
            #print(f"Processed and deleted: {fits_file}")
        except Exception as e:
            print(f"Error processing {fits_file}: {e}")
print("Finished processing existing FITS files.")

Processing existing FITS files...
Finished processing existing FITS files.


In [None]:
# Define directories
parquet_dir = "/home/thara/Big/Spectra_Parquet"  # Directory for Parquet files

# Load the DataFrame with bestObjID, plate, mjd, and fiberid
metadata_df = pd.read_csv('/home/thara/Big/specobj_results.csv')  # Replace with your metadata file

# Initialize a list to store results
results = []

# Process each Parquet file
for parquet_file in os.listdir(parquet_dir):
    if parquet_file.endswith(".parquet"):
        # Extract plate, mjd, and fiberid from the filename
        file_parts = parquet_file.split("_")
        plate = int(file_parts[1][5:])
        mjd = int(file_parts[2][3:])
        fiberid = int(file_parts[3][5:].replace(".parquet", ""))
        
        # Load the Parquet file
        parquet_path = os.path.join(parquet_dir, parquet_file)
        try:
            data = pd.read_parquet(parquet_path)
            
            # Convert loglam to lam
            data['lam'] = 10 ** data['loglam']
            
            # Define wavelength ranges
            range1 = (3850, 6800)  # First range
            range2 = (6400, 9150)  # Second range
            
            # Calculate integrated flux for each range
            mask1 = (data['lam'] >= range1[0]) & (data['lam'] <= range1[1])
            mask2 = (data['lam'] >= range2[0]) & (data['lam'] <= range2[1])
            
            flux1 = np.trapz(data.loc[mask1, 'flux'], data.loc[mask1, 'lam'])
            flux2 = np.trapz(data.loc[mask2, 'flux'], data.loc[mask2, 'lam'])
            
            # Calculate the flux ratio
            flux_ratio = flux1 / flux2 if flux2 != 0 else np.nan
            
            # Find the bestObjID from metadata
            matched_row = metadata_df[
                (metadata_df['plate'] == plate) & 
                (metadata_df['mjd'] == mjd) & 
                (metadata_df['fiberid'] == fiberid)
            ]
            
            if not matched_row.empty:
                bestObjID = matched_row['bestObjID'].values[0]
                results.append({'bestObjID': bestObjID, 'plate': plate, 'mjd': mjd, 'fiberid': fiberid, 'flux_ratio': flux_ratio})
            else:
                print(f"No match found for Parquet file: {parquet_file}")
        except Exception as e:
            print(f"Error processing {parquet_file}: {e}")

# Create a DataFrame with the results
results_df = pd.DataFrame(results)

# Save the results to a CSV file
results_df.to_csv("/home/thara/Big/flux_ratios.csv", index=False)

print("Flux ratios calculated and saved to 'flux_ratios.csv'.")

Flux ratios calculated and saved to 'flux_ratios.csv'.


In [15]:
results_df

Unnamed: 0,bestObjID,plate,mjd,fiberid,flux_ratio
0,1237663784201486401,7868,57006,816,2.177181
1,1237665548902859021,2202,53566,290,2.016669
2,1237654380901957961,7512,56777,134,1.728434
3,1237655693011845153,918,52404,515,1.013675
4,1237667322185580645,2236,53729,1,0.747908
...,...,...,...,...,...
133789,1237680273108960003,7572,56944,710,1.510391
133790,1237680285996155390,6265,56248,512,1.797892
133791,1237680303715582376,6295,56536,434,0.798917
133792,1237680308016841309,6515,56536,932,1.233320


In [19]:
# Assuming `duplicate_ids` contains the duplicate 'original_ext_source_id'
duplicate_ids_set = set(duplicate_ids['original_ext_source_id'])

# Remove rows from results_df where 'bestObjID' is in duplicate_ids
filtered_results_df = results_df[~results_df['bestObjID'].isin(duplicate_ids_set)]

# Check the number of rows before and after filtering
print(f"Original number of rows: {len(results_df)}")
print(f"Filtered number of rows: {len(filtered_results_df)}")

Original number of rows: 133794
Filtered number of rows: 133756


In [20]:
# Merge the two DataFrames on 'bestObjID' (filtered_results_df) and 'original_ext_source_id' (crossmatch)
merged_results_df = filtered_results_df.merge(
    crossmatch[['SOURCE_ID', 'original_ext_source_id']],
    left_on='bestObjID', 
    right_on='original_ext_source_id',
    how='left'  # Use 'left' to retain all rows in filtered_results_df
)

# Drop the 'original_ext_source_id' column if it's no longer needed
merged_results_df = merged_results_df.drop(columns=['original_ext_source_id'])
merged_results_df

Unnamed: 0,bestObjID,plate,mjd,fiberid,flux_ratio,SOURCE_ID
0,1237663784201486401,7868,57006,816,2.177181,2543193557806316800
1,1237665548902859021,2202,53566,290,2.016669,4463606678617356800
2,1237654380901957961,7512,56777,134,1.728434,1014204471947028992
3,1237655693011845153,918,52404,515,1.013675,3652568217199181440
4,1237667322185580645,2236,53729,1,0.747908,3961435288437544320
...,...,...,...,...,...,...
133751,1237680273108960003,7572,56944,710,1.510391,1779912581508627072
133752,1237680285996155390,6265,56248,512,1.797892,309194386402373120
133753,1237680303715582376,6295,56536,434,0.798917,1883704073988238336
133754,1237680308016841309,6515,56536,932,1.233320,2866659811294043904


Now we will proceed to download the GAIA data.

In [22]:
# Configuration
batch_size = 1000  # Number of IDs per batch
output_file = "/home/thara/Big/gaia_phot_flux_results.csv"  # Output file for results
failed_batches_file = "/home/thara/Big/failed_batches.txt"  # Log file for failed batches
max_retries = 3  # Number of retries for failed batches

# Load the DataFrame containing SOURCE_ID
source_ids = merged_results_df['SOURCE_ID'].dropna().unique()

# Main batch processing loop
for i in range(0, len(source_ids), batch_size):
    # Create the batch query
    batch_ids = "','".join(map(str, source_ids[i:i + batch_size]))
    
    query = f"""
    SELECT source_id, phot_bp_mean_flux, phot_rp_mean_flux
    FROM gaiadr3.gaia_source
    WHERE source_id IN ('{batch_ids}')
    """
    
    retries = 0
    success = False

    while retries < max_retries and not success:
        try:
            print(f"Processing batch {i // batch_size + 1} ({i} to {i + batch_size} IDs)... Attempt {retries + 1}")
            job = Gaia.launch_job(query)
            batch_results = job.get_results().to_pandas()
            print(f"Batch {i // batch_size + 1} returned {len(batch_results)} rows.")
            
            # Reload existing results
            existing_results = pd.read_csv(output_file) if os.path.exists(output_file) else pd.DataFrame()
            
            # Combine results and remove duplicates
            combined_results = pd.concat([existing_results, batch_results], ignore_index=True).drop_duplicates()
            
            # Save the updated results
            combined_results.to_csv(output_file, index=False)
            print(f"Batch {i // batch_size + 1} completed. Total rows saved: {len(combined_results)}")
            
            success = True  # Mark batch as successful
        except Exception as e:
            retries += 1
            print(f"Error processing batch {i} to {i + batch_size}: {e}")
    
    if not success:
        # Log failed batch to file
        with open(failed_batches_file, "a") as log_file:
            log_file.write(f"{i},{i + batch_size}\n")
        print(f"Batch {i} to {i + batch_size} failed after {max_retries} retries. Moving to the next batch.")

# Reprocess failed batches
print("Reprocessing failed batches...")
try:
    with open(failed_batches_file, "r") as log_file:
        failed_batches = log_file.readlines()
except FileNotFoundError:
    failed_batches = []

for batch_range in failed_batches:
    start, end = map(int, batch_range.strip().split(","))
    batch_ids = "','".join(map(str, source_ids[start:end]))
    query = f"""
    SELECT source_id, phot_bp_mean_flux, phot_rp_mean_flux
    FROM gaiadr3.gaia_source
    WHERE source_id IN ('{batch_ids}')
    """
    try:
        print(f"Reprocessing batch {start} to {end}...")
        job = Gaia.launch_job(query)
        batch_results = job.get_results().to_pandas()
        
        # Reload existing results
        existing_results = pd.read_csv(output_file) if os.path.exists(output_file) else pd.DataFrame()
        
        # Combine results and remove duplicates
        combined_results = pd.concat([existing_results, batch_results], ignore_index=True).drop_duplicates()
        
        # Save the updated results
        combined_results.to_csv(output_file, index=False)
        print(f"Batch {start} to {end} reprocessed successfully. Total rows saved: {len(combined_results)}")
        
        # Remove successfully reprocessed batch from log
        with open(failed_batches_file, "r") as log_file:
            lines = log_file.readlines()
        with open(failed_batches_file, "w") as log_file:
            log_file.writelines(line for line in lines if line.strip() != f"{start},{end}")
    except Exception as e:
        print(f"Failed to reprocess batch {start} to {end}: {e}")

# Final message
print("All processing completed.")


Processing batch 1 (0 to 1000 IDs)... Attempt 1
Batch 1 returned 1000 rows.
Batch 1 completed. Total rows saved: 1000
Processing batch 2 (1000 to 2000 IDs)... Attempt 1
Batch 2 returned 1000 rows.
Batch 2 completed. Total rows saved: 2000
Processing batch 3 (2000 to 3000 IDs)... Attempt 1
Batch 3 returned 1000 rows.
Batch 3 completed. Total rows saved: 3000
Processing batch 4 (3000 to 4000 IDs)... Attempt 1
Batch 4 returned 1000 rows.
Batch 4 completed. Total rows saved: 4000
Processing batch 5 (4000 to 5000 IDs)... Attempt 1
Batch 5 returned 1000 rows.
Batch 5 completed. Total rows saved: 5000
Processing batch 6 (5000 to 6000 IDs)... Attempt 1
Batch 6 returned 1000 rows.
Batch 6 completed. Total rows saved: 6000
Processing batch 7 (6000 to 7000 IDs)... Attempt 1
Batch 7 returned 1000 rows.
Batch 7 completed. Total rows saved: 7000
Processing batch 8 (7000 to 8000 IDs)... Attempt 1
Batch 8 returned 1000 rows.
Batch 8 completed. Total rows saved: 8000
Processing batch 9 (8000 to 9000 ID

In [23]:
combined_results

Unnamed: 0,SOURCE_ID,phot_bp_mean_flux,phot_rp_mean_flux
0,88751204298752,97.302780,80.055475
1,190009353405440,119.288407,95.199147
2,5054111356737024,118.767233,183.231920
3,96986889894275200,57.062729,85.158148
4,100503746555005056,82.720489,72.286897
...,...,...,...
133751,5180145968212943744,563.933805,623.663854
133752,5186193389540051200,88.635141,93.043727
133753,5186205720391038336,78.457424,86.174115
133754,6898156118091837056,94.715035,125.106882


In [24]:
# Merge the DataFrames on the SOURCE_ID column
final_results_df = merged_results_df.merge(
    combined_results,
    on="SOURCE_ID",  # Join on SOURCE_ID
    how="inner"  # Use inner join to keep only matching rows
)

# Print the number of rows and a preview
print(f"Joined DataFrame has {len(final_results_df)} rows.")
print(final_results_df.head())

Joined DataFrame has 133756 rows.
             bestObjID  plate    mjd  fiberid  flux_ratio  \
0  1237663784201486401   7868  57006      816    2.177181   
1  1237665548902859021   2202  53566      290    2.016669   
2  1237654380901957961   7512  56777      134    1.728434   
3  1237655693011845153    918  52404      515    1.013675   
4  1237667322185580645   2236  53729        1    0.747908   

             SOURCE_ID  phot_bp_mean_flux  phot_rp_mean_flux  
0  2543193557806316800         771.161658         812.454893  
1  4463606678617356800         230.410150         216.745479  
2  1014204471947028992          59.991009          88.311689  
3  3652568217199181440         350.539988         705.208239  
4  3961435288437544320         114.175541         260.146202  


In [25]:
# Ensure the required columns exist
if 'phot_bp_mean_flux' in final_results_df.columns and 'phot_rp_mean_flux' in final_results_df.columns:
    # Calculate the photometric flux ratio
    final_results_df['photometric_flux_ratio'] = (
        final_results_df['phot_bp_mean_flux'] / final_results_df['phot_rp_mean_flux']
    )
    
    # Handle cases where grp_flux is zero to avoid division by zero errors
    final_results_df['photometric_flux_ratio'] = final_results_df['photometric_flux_ratio'].replace([np.inf, -np.inf], np.nan)
    
    print("Photometric flux ratio calculated and added to the DataFrame.")
else:
    print("Required columns 'phot_bp_mean_flux' and 'phot_rp_mean_flux' are missing.")

# Save the updated DataFrame
final_results_df.to_csv("/home/thara/Big/final_results.csv", index=False)

# Preview the updated DataFrame
final_results_df


Photometric flux ratio calculated and added to the DataFrame.


Unnamed: 0,bestObjID,plate,mjd,fiberid,flux_ratio,SOURCE_ID,phot_bp_mean_flux,phot_rp_mean_flux,photometric_flux_ratio
0,1237663784201486401,7868,57006,816,2.177181,2543193557806316800,771.161658,812.454893,0.949175
1,1237665548902859021,2202,53566,290,2.016669,4463606678617356800,230.410150,216.745479,1.063045
2,1237654380901957961,7512,56777,134,1.728434,1014204471947028992,59.991009,88.311689,0.679310
3,1237655693011845153,918,52404,515,1.013675,3652568217199181440,350.539988,705.208239,0.497073
4,1237667322185580645,2236,53729,1,0.747908,3961435288437544320,114.175541,260.146202,0.438890
...,...,...,...,...,...,...,...,...,...
133751,1237680273108960003,7572,56944,710,1.510391,1779912581508627072,98.424982,43.140794,2.281483
133752,1237680285996155390,6265,56248,512,1.797892,309194386402373120,48.999154,76.712520,0.638737
133753,1237680303715582376,6295,56536,434,0.798917,1883704073988238336,103.721589,334.565475,0.310019
133754,1237680308016841309,6515,56536,932,1.233320,2866659811294043904,164.762430,69.931613,2.356051


Now we will proceed to the analysis of the data.